fhir_sdk/client/fhir/
paging.rs

1//! FHIR paging functionality, e.g. for search results.
2
3use std::{any::type_name, fmt::Debug, marker::PhantomData};
4
5use futures::{Stream, StreamExt, TryStreamExt, stream};
6use reqwest::{StatusCode, Url, header::HeaderValue};
7
8use super::{Client, Error};
9use crate::{
10	extensions::{BundleEntryExt, BundleEntryRequestExt, BundleExt, SearchEntryModeExt},
11	version::FhirVersion,
12};
13
14/// Type alias for the `BundleEntry` type for any version.
15type BundleEntry<V> = <<V as FhirVersion>::Bundle as BundleExt>::Entry;
16
17/// Wrapper around `Bundle`s that have multiple pages of results, e.g. search results, resource
18/// history, etc.
19pub struct Page<V: FhirVersion, R> {
20	/// The FHIR client to make further requests for the next pages and resources.
21	client: Client<V>,
22	/// The inner Bundle result.
23	bundle: V::Bundle,
24	/// The correlation ID to send when fetching all further pages.
25	correlation_id: HeaderValue,
26
27	/// The resource type to return in matches.
28	_resource_type: PhantomData<R>,
29}
30
31impl<V: FhirVersion, R> Page<V, R>
32where
33	(StatusCode, V::OperationOutcome): Into<Error>,
34	R: TryFrom<V::Resource> + Send + Sync + 'static,
35	for<'a> &'a R: TryFrom<&'a V::Resource>,
36{
37	/// Create a new `Page` result from a `Bundle` and client.
38	pub(crate) const fn new(
39		client: Client<V>,
40		bundle: V::Bundle,
41		correlation_id: HeaderValue,
42	) -> Self {
43		Self { client, bundle, correlation_id, _resource_type: PhantomData }
44	}
45
46	/// Get the next page URL, if there is one.
47	pub fn next_page_url(&self) -> Option<&String> {
48		self.bundle.next_page_url()
49	}
50
51	/// Fetch the next page and return it.
52	pub async fn next_page(&self) -> Option<Result<Self, Error>> {
53		let next_page_url = self.next_page_url()?;
54		let url = match Url::parse(next_page_url) {
55			Ok(url) => url,
56			Err(_err) => return Some(Err(Error::UrlParse(next_page_url.clone()))),
57		};
58
59		tracing::debug!("Fetching next page from URL: {next_page_url}");
60		let next_bundle = match self
61			.client
62			.read_generic::<V::Bundle>(url, Some(self.correlation_id.clone()))
63			.await
64		{
65			Ok(Some(bundle)) => bundle,
66			Ok(None) => return Some(Err(Error::ResourceNotFound(next_page_url.clone()))),
67			Err(err) => return Some(Err(err)),
68		};
69
70		Some(Ok(Self::new(self.client.clone(), next_bundle, self.correlation_id.clone())))
71	}
72
73	/// Get the `total` field, indicating the total number of results.
74	pub fn total(&self) -> Option<u32> {
75		self.bundle.total()
76	}
77
78	/// Get access to the inner `Bundle`.
79	pub const fn bundle(&self) -> &V::Bundle {
80		&self.bundle
81	}
82
83	/// Consume the `Page` and return the inner `Bundle`.
84	pub fn into_inner(self) -> V::Bundle {
85		self.bundle
86	}
87
88	/// Consumes the raw inner entries, leaving the page empty. Returns the entries.
89	pub fn take_entries(&mut self) -> Vec<Option<BundleEntry<V>>> {
90		self.bundle.take_entries()
91	}
92
93	/// Get the entries of this page, ignoring entries whenever there is no `resource` in the entry.
94	pub fn entries(&self) -> impl Iterator<Item = &V::Resource> + Send {
95		self.bundle.entries().filter_map(|entry| entry.resource())
96	}
97
98	/// Get the matches of this page, ignoring entries whenever there is no `resource` in the entry
99	/// or resources of the wrong type.
100	pub fn matches(&self) -> impl Iterator<Item = &R> + Send {
101		self.bundle
102			.entries()
103			.filter(|entry| entry.search_mode().is_some_and(SearchEntryModeExt::is_match))
104			.filter_map(|entry| entry.resource())
105			.filter_map(|resource| resource.try_into().ok())
106	}
107
108	/// Get the entries of this page, where the `fullUrl` is automatically resolved whenever there
109	/// is no `resource` in the entry. Delete entries are ignored as well (e.g. in history
110	/// requests), you can access the raw entries with [Self::bundle] or [Self::take_entries] if you
111	/// need these.
112	/// Consumes the entries, leaving the page empty.
113	pub fn entries_owned(
114		&mut self,
115	) -> impl Stream<Item = Result<V::Resource, Error>> + Send + 'static {
116		let client = self.client.clone();
117		let correlation_id = self.correlation_id.clone();
118		stream::iter(self.take_entries().into_iter().flatten()).filter_map(move |entry| {
119			resolve_bundle_entry(entry, client.clone(), correlation_id.clone())
120		})
121	}
122
123	/// Get the matches of this page, where the `fullUrl` is automatically resolved whenever there
124	/// is no `resource` in the entry. Delete entries are ignored as well (e.g. in history
125	/// requests), you can access the raw entries with [Self::bundle] or [Self::take_entries] if you
126	/// need these. Ignores entries of the wrong resource type and entries without resource or full
127	/// URL.
128	/// Consumes the entries, leaving the page empty.
129	pub fn matches_owned(&mut self) -> impl Stream<Item = Result<R, Error>> + Send + 'static {
130		let client = self.client.clone();
131		let correlation_id = self.correlation_id.clone();
132		stream::iter(
133			self.take_entries()
134				.into_iter()
135				.flatten()
136				.filter(|entry| entry.search_mode().is_some_and(SearchEntryModeExt::is_match)),
137		)
138		.filter_map(move |entry| {
139			resolve_bundle_entry(entry, client.clone(), correlation_id.clone())
140		})
141		.try_filter_map(|resource| std::future::ready(Ok(resource.try_into().ok())))
142	}
143
144	/// Start automatic paging through all entries across pages.
145	///
146	/// Hint: you can activate pre-fetching by [StreamExt::buffered].
147	pub fn all_entries(
148		mut self,
149	) -> impl Stream<Item = Result<V::Resource, Error>> + Send + 'static {
150		self.entries_owned()
151			.chain(
152				stream::once(async move { self.next_page().await })
153					.filter_map(std::future::ready)
154					.map_ok(Self::all_entries)
155					.try_flatten(),
156			)
157			.boxed() // Somehow gives error when using if not boxed?
158	}
159
160	/// Start automatic paging through all matches across pages.
161	///
162	/// Hint: you can activate pre-fetching by [StreamExt::buffered].
163	pub fn all_matches(mut self) -> impl Stream<Item = Result<R, Error>> + Send + 'static {
164		self.matches_owned()
165			.chain(
166				stream::once(async move { self.next_page().await })
167					.filter_map(std::future::ready)
168					.map_ok(Self::all_matches)
169					.try_flatten(),
170			)
171			.boxed() // Somehow gives error when using if not boxed?
172	}
173}
174
175/// Convert the bundle entry into a resource, resolving the `fullUrl` if there is no resource
176/// inside and it is not a `DELETE` request. Returns `None` if there is neither resource nor full
177/// URL or it is a `DELETE` request.
178async fn resolve_bundle_entry<V: FhirVersion>(
179	entry: BundleEntry<V>,
180	client: Client<V>,
181	correlation_id: HeaderValue,
182) -> Option<Result<V::Resource, Error>>
183where
184	(StatusCode, V::OperationOutcome): Into<Error>,
185{
186	if entry.resource().is_some() {
187		return entry.into_resource().map(Ok);
188	}
189
190	if let Some(request) = entry.request() {
191		if request.is_delete() {
192			return None;
193		}
194	}
195
196	let full_url = entry.full_url()?;
197	let url = match Url::parse(full_url) {
198		Ok(url) => url,
199		Err(_err) => return Some(Err(Error::UrlParse(full_url.clone()))),
200	};
201
202	let result = client
203		.read_generic::<V::Resource>(url, Some(correlation_id))
204		.await
205		.and_then(|opt| opt.ok_or_else(|| Error::ResourceNotFound(full_url.clone())));
206	Some(result)
207}
208
209impl<V: FhirVersion, R> Clone for Page<V, R> {
210	fn clone(&self) -> Self {
211		Self {
212			client: self.client.clone(),
213			bundle: self.bundle.clone(),
214			correlation_id: self.correlation_id.clone(),
215			_resource_type: self._resource_type,
216		}
217	}
218}
219
220impl<V: FhirVersion, R> Debug for Page<V, R> {
221	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222		f.debug_struct("Page")
223			.field("client", &self.client)
224			.field("bundle", &self.bundle)
225			.field("correlation_id", &self.correlation_id)
226			.field("_resource_type", &type_name::<R>())
227			.finish()
228	}
229}