fhir_sdk/client/fhir/
paging.rs1use std::{any::type_name, fmt::Debug, marker::PhantomData};
4
5use futures::{Stream, StreamExt, TryStreamExt, stream};
6use reqwest::{StatusCode, Url, header::HeaderValue};
7
8use super::{Client, Error};
9use crate::{
10 extensions::{BundleEntryExt, BundleEntryRequestExt, BundleExt, SearchEntryModeExt},
11 version::FhirVersion,
12};
13
14type BundleEntry<V> = <<V as FhirVersion>::Bundle as BundleExt>::Entry;
16
17pub struct Page<V: FhirVersion, R> {
20 client: Client<V>,
22 bundle: V::Bundle,
24 correlation_id: HeaderValue,
26
27 _resource_type: PhantomData<R>,
29}
30
31impl<V: FhirVersion, R> Page<V, R>
32where
33 (StatusCode, V::OperationOutcome): Into<Error>,
34 R: TryFrom<V::Resource> + Send + Sync + 'static,
35 for<'a> &'a R: TryFrom<&'a V::Resource>,
36{
37 pub(crate) const fn new(
39 client: Client<V>,
40 bundle: V::Bundle,
41 correlation_id: HeaderValue,
42 ) -> Self {
43 Self { client, bundle, correlation_id, _resource_type: PhantomData }
44 }
45
46 pub fn next_page_url(&self) -> Option<&String> {
48 self.bundle.next_page_url()
49 }
50
51 pub async fn next_page(&self) -> Option<Result<Self, Error>> {
53 let next_page_url = self.next_page_url()?;
54 let url = match Url::parse(next_page_url) {
55 Ok(url) => url,
56 Err(_err) => return Some(Err(Error::UrlParse(next_page_url.clone()))),
57 };
58
59 tracing::debug!("Fetching next page from URL: {next_page_url}");
60 let next_bundle = match self
61 .client
62 .read_generic::<V::Bundle>(url, Some(self.correlation_id.clone()))
63 .await
64 {
65 Ok(Some(bundle)) => bundle,
66 Ok(None) => return Some(Err(Error::ResourceNotFound(next_page_url.clone()))),
67 Err(err) => return Some(Err(err)),
68 };
69
70 Some(Ok(Self::new(self.client.clone(), next_bundle, self.correlation_id.clone())))
71 }
72
73 pub fn total(&self) -> Option<u32> {
75 self.bundle.total()
76 }
77
78 pub const fn bundle(&self) -> &V::Bundle {
80 &self.bundle
81 }
82
83 pub fn into_inner(self) -> V::Bundle {
85 self.bundle
86 }
87
88 pub fn take_entries(&mut self) -> Vec<Option<BundleEntry<V>>> {
90 self.bundle.take_entries()
91 }
92
93 pub fn entries(&self) -> impl Iterator<Item = &V::Resource> + Send {
95 self.bundle.entries().filter_map(|entry| entry.resource())
96 }
97
98 pub fn matches(&self) -> impl Iterator<Item = &R> + Send {
101 self.bundle
102 .entries()
103 .filter(|entry| entry.search_mode().is_some_and(SearchEntryModeExt::is_match))
104 .filter_map(|entry| entry.resource())
105 .filter_map(|resource| resource.try_into().ok())
106 }
107
108 pub fn entries_owned(
114 &mut self,
115 ) -> impl Stream<Item = Result<V::Resource, Error>> + Send + 'static {
116 let client = self.client.clone();
117 let correlation_id = self.correlation_id.clone();
118 stream::iter(self.take_entries().into_iter().flatten()).filter_map(move |entry| {
119 resolve_bundle_entry(entry, client.clone(), correlation_id.clone())
120 })
121 }
122
123 pub fn matches_owned(&mut self) -> impl Stream<Item = Result<R, Error>> + Send + 'static {
130 let client = self.client.clone();
131 let correlation_id = self.correlation_id.clone();
132 stream::iter(
133 self.take_entries()
134 .into_iter()
135 .flatten()
136 .filter(|entry| entry.search_mode().is_some_and(SearchEntryModeExt::is_match)),
137 )
138 .filter_map(move |entry| {
139 resolve_bundle_entry(entry, client.clone(), correlation_id.clone())
140 })
141 .try_filter_map(|resource| std::future::ready(Ok(resource.try_into().ok())))
142 }
143
144 pub fn all_entries(
148 mut self,
149 ) -> impl Stream<Item = Result<V::Resource, Error>> + Send + 'static {
150 self.entries_owned()
151 .chain(
152 stream::once(async move { self.next_page().await })
153 .filter_map(std::future::ready)
154 .map_ok(Self::all_entries)
155 .try_flatten(),
156 )
157 .boxed() }
159
160 pub fn all_matches(mut self) -> impl Stream<Item = Result<R, Error>> + Send + 'static {
164 self.matches_owned()
165 .chain(
166 stream::once(async move { self.next_page().await })
167 .filter_map(std::future::ready)
168 .map_ok(Self::all_matches)
169 .try_flatten(),
170 )
171 .boxed() }
173}
174
175async fn resolve_bundle_entry<V: FhirVersion>(
179 entry: BundleEntry<V>,
180 client: Client<V>,
181 correlation_id: HeaderValue,
182) -> Option<Result<V::Resource, Error>>
183where
184 (StatusCode, V::OperationOutcome): Into<Error>,
185{
186 if entry.resource().is_some() {
187 return entry.into_resource().map(Ok);
188 }
189
190 if let Some(request) = entry.request() {
191 if request.is_delete() {
192 return None;
193 }
194 }
195
196 let full_url = entry.full_url()?;
197 let url = match Url::parse(full_url) {
198 Ok(url) => url,
199 Err(_err) => return Some(Err(Error::UrlParse(full_url.clone()))),
200 };
201
202 let result = client
203 .read_generic::<V::Resource>(url, Some(correlation_id))
204 .await
205 .and_then(|opt| opt.ok_or_else(|| Error::ResourceNotFound(full_url.clone())));
206 Some(result)
207}
208
209impl<V: FhirVersion, R> Clone for Page<V, R> {
210 fn clone(&self) -> Self {
211 Self {
212 client: self.client.clone(),
213 bundle: self.bundle.clone(),
214 correlation_id: self.correlation_id.clone(),
215 _resource_type: self._resource_type,
216 }
217 }
218}
219
220impl<V: FhirVersion, R> Debug for Page<V, R> {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 f.debug_struct("Page")
223 .field("client", &self.client)
224 .field("bundle", &self.bundle)
225 .field("correlation_id", &self.correlation_id)
226 .field("_resource_type", &type_name::<R>())
227 .finish()
228 }
229}