firestore/db/
listen_changes.rs

1use crate::db::safe_document_path;
2use crate::errors::*;
3use crate::timestamp_utils::to_timestamp;
4use crate::{FirestoreDb, FirestoreQueryParams, FirestoreResult, FirestoreResumeStateStorage};
5pub use async_trait::async_trait;
6use chrono::prelude::*;
7use futures::stream::BoxStream;
8use futures::StreamExt;
9use futures::TryFutureExt;
10use futures::TryStreamExt;
11use gcloud_sdk::google::firestore::v1::*;
12use rsb_derive::*;
13pub use rvstruct::ValueStruct;
14use std::collections::HashMap;
15use std::future::Future;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
19use tokio::task::JoinHandle;
20use tracing::*;
21
22#[derive(Debug, Clone, Builder)]
23pub struct FirestoreListenerTargetParams {
24    pub target: FirestoreListenerTarget,
25    pub target_type: FirestoreTargetType,
26    pub resume_type: Option<FirestoreListenerTargetResumeType>,
27    pub add_target_once: Option<bool>,
28    pub labels: HashMap<String, String>,
29}
30
31impl FirestoreListenerTargetParams {
32    pub fn validate(&self) -> FirestoreResult<()> {
33        self.target.validate()?;
34        Ok(())
35    }
36}
37
38#[derive(Debug, Clone, Builder)]
39pub struct FirestoreCollectionDocuments {
40    pub parent: Option<String>,
41    pub collection: String,
42    pub documents: Vec<String>,
43}
44
45#[allow(clippy::large_enum_variant)]
46#[derive(Debug, Clone)]
47pub enum FirestoreTargetType {
48    Query(FirestoreQueryParams),
49    Documents(FirestoreCollectionDocuments),
50}
51
52#[derive(Debug, Clone)]
53pub enum FirestoreListenerTargetResumeType {
54    Token(FirestoreListenerToken),
55    ReadTime(DateTime<Utc>),
56}
57
58#[async_trait]
59pub trait FirestoreListenSupport {
60    async fn listen_doc_changes<'a, 'b>(
61        &'a self,
62        targets: Vec<FirestoreListenerTargetParams>,
63    ) -> FirestoreResult<BoxStream<'b, FirestoreResult<ListenResponse>>>;
64}
65
66#[async_trait]
67impl FirestoreListenSupport for FirestoreDb {
68    async fn listen_doc_changes<'a, 'b>(
69        &'a self,
70        targets: Vec<FirestoreListenerTargetParams>,
71    ) -> FirestoreResult<BoxStream<'b, FirestoreResult<ListenResponse>>> {
72        let listen_requests = targets
73            .into_iter()
74            .map(|target_params| self.create_listen_request(target_params))
75            .collect::<FirestoreResult<Vec<ListenRequest>>>()?;
76
77        let request = gcloud_sdk::tonic::Request::new(
78            futures::stream::iter(listen_requests).chain(futures::stream::pending()),
79        );
80
81        let response = self.client().get().listen(request).await?;
82
83        Ok(response.into_inner().map_err(|e| e.into()).boxed())
84    }
85}
86
87#[derive(Clone, Debug, Eq, PartialEq, Hash, ValueStruct)]
88pub struct FirestoreListenerTarget(u32);
89
90impl FirestoreListenerTarget {
91    pub fn validate(&self) -> FirestoreResult<()> {
92        if *self.value() == 0 {
93            Err(FirestoreError::InvalidParametersError(
94                FirestoreInvalidParametersError::new(FirestoreInvalidParametersPublicDetails::new(
95                    "target_id".to_string(),
96                    "Listener target ID cannot be zero".to_string(),
97                )),
98            ))
99        } else if *self.value() > i32::MAX as u32 {
100            Err(FirestoreError::InvalidParametersError(
101                FirestoreInvalidParametersError::new(FirestoreInvalidParametersPublicDetails::new(
102                    "target_id".to_string(),
103                    format!(
104                        "Listener target ID cannot be more than: {}. {} is specified",
105                        i32::MAX,
106                        self.value()
107                    ),
108                )),
109            ))
110        } else {
111            Ok(())
112        }
113    }
114}
115
116impl TryInto<i32> for FirestoreListenerTarget {
117    type Error = FirestoreError;
118
119    fn try_into(self) -> FirestoreResult<i32> {
120        self.validate()?;
121        (*self.value()).try_into().map_err(|e| {
122            FirestoreError::InvalidParametersError(FirestoreInvalidParametersError::new(
123                FirestoreInvalidParametersPublicDetails::new(
124                    "target_id".to_string(),
125                    format!("Invalid target ID: {} {}", self.value(), e),
126                ),
127            ))
128        })
129    }
130}
131
132impl TryFrom<i32> for FirestoreListenerTarget {
133    type Error = FirestoreError;
134
135    fn try_from(value: i32) -> FirestoreResult<Self> {
136        value
137            .try_into()
138            .map_err(|e| {
139                FirestoreError::InvalidParametersError(FirestoreInvalidParametersError::new(
140                    FirestoreInvalidParametersPublicDetails::new(
141                        "target_id".to_string(),
142                        format!("Invalid target ID: {value} {e}"),
143                    ),
144                ))
145            })
146            .map(FirestoreListenerTarget)
147    }
148}
149
150#[derive(Clone, Debug, ValueStruct)]
151pub struct FirestoreListenerToken(Vec<u8>);
152
153impl FirestoreDb {
154    pub async fn create_listener<S>(
155        &self,
156        storage: S,
157    ) -> FirestoreResult<FirestoreListener<FirestoreDb, S>>
158    where
159        S: FirestoreResumeStateStorage + Clone + Send + Sync + 'static,
160    {
161        self.create_listener_with_params(storage, FirestoreListenerParams::new())
162            .await
163    }
164
165    pub async fn create_listener_with_params<S>(
166        &self,
167        storage: S,
168        params: FirestoreListenerParams,
169    ) -> FirestoreResult<FirestoreListener<FirestoreDb, S>>
170    where
171        S: FirestoreResumeStateStorage + Clone + Send + Sync + 'static,
172    {
173        FirestoreListener::new(self.clone(), storage, params).await
174    }
175
176    fn create_listen_request(
177        &self,
178        target_params: FirestoreListenerTargetParams,
179    ) -> FirestoreResult<ListenRequest> {
180        Ok(ListenRequest {
181            database: self.get_database_path().to_string(),
182            labels: target_params.labels,
183            target_change: Some(listen_request::TargetChange::AddTarget(Target {
184                target_id: target_params.target.try_into()?,
185                once: target_params.add_target_once.unwrap_or(false),
186                target_type: Some(match target_params.target_type {
187                    FirestoreTargetType::Query(query_params) => {
188                        target::TargetType::Query(target::QueryTarget {
189                            parent: query_params
190                                .parent
191                                .as_ref()
192                                .unwrap_or_else(|| self.get_documents_path())
193                                .clone(),
194                            query_type: Some(target::query_target::QueryType::StructuredQuery(
195                                query_params.try_into()?,
196                            )),
197                        })
198                    }
199                    FirestoreTargetType::Documents(collection_documents) => {
200                        target::TargetType::Documents(target::DocumentsTarget {
201                            documents: collection_documents
202                                .documents
203                                .into_iter()
204                                .map(|doc_id| {
205                                    safe_document_path(
206                                        collection_documents
207                                            .parent
208                                            .as_deref()
209                                            .unwrap_or_else(|| self.get_documents_path()),
210                                        collection_documents.collection.as_str(),
211                                        doc_id,
212                                    )
213                                })
214                                .collect::<FirestoreResult<Vec<String>>>()?,
215                        })
216                    }
217                }),
218                resume_type: target_params
219                    .resume_type
220                    .map(|resume_type| match resume_type {
221                        FirestoreListenerTargetResumeType::Token(token) => {
222                            target::ResumeType::ResumeToken(token.into_value())
223                        }
224                        FirestoreListenerTargetResumeType::ReadTime(dt) => {
225                            target::ResumeType::ReadTime(to_timestamp(dt))
226                        }
227                    }),
228                ..Default::default()
229            })),
230        })
231    }
232}
233
234pub type FirestoreListenEvent = listen_response::ResponseType;
235
236#[derive(Debug, Clone, Eq, PartialEq, Builder)]
237pub struct FirestoreListenerParams {
238    pub retry_delay: Option<std::time::Duration>,
239}
240
241pub struct FirestoreListener<D, S>
242where
243    D: FirestoreListenSupport,
244    S: FirestoreResumeStateStorage,
245{
246    db: D,
247    storage: S,
248    listener_params: FirestoreListenerParams,
249    targets: Vec<FirestoreListenerTargetParams>,
250    shutdown_flag: Arc<AtomicBool>,
251    shutdown_handle: Option<JoinHandle<()>>,
252    shutdown_writer: Option<Arc<UnboundedSender<i8>>>,
253}
254
255impl<D, S> FirestoreListener<D, S>
256where
257    D: FirestoreListenSupport + Clone + Send + Sync + 'static,
258    S: FirestoreResumeStateStorage + Clone + Send + Sync + 'static,
259{
260    pub async fn new(
261        db: D,
262        storage: S,
263        listener_params: FirestoreListenerParams,
264    ) -> FirestoreResult<FirestoreListener<D, S>> {
265        Ok(FirestoreListener {
266            db,
267            storage,
268            listener_params,
269            targets: vec![],
270            shutdown_flag: Arc::new(AtomicBool::new(false)),
271            shutdown_handle: None,
272            shutdown_writer: None,
273        })
274    }
275
276    pub fn add_target(
277        &mut self,
278        target_params: FirestoreListenerTargetParams,
279    ) -> FirestoreResult<()> {
280        target_params.validate()?;
281        self.targets.push(target_params);
282        Ok(())
283    }
284
285    pub async fn start<FN, F>(&mut self, cb: FN) -> FirestoreResult<()>
286    where
287        FN: Fn(FirestoreListenEvent) -> F + Send + Sync + 'static,
288        F: Future<Output = AnyBoxedErrResult<()>> + Send + 'static,
289    {
290        info!(
291            num_targets = self.targets.len(),
292            "Starting a Firestore listener for targets...",
293        );
294
295        let mut initial_states: HashMap<FirestoreListenerTarget, FirestoreListenerTargetParams> =
296            HashMap::new();
297        for target_params in &self.targets {
298            match &target_params.resume_type {
299                Some(resume_type) => {
300                    initial_states.insert(
301                        target_params.target.clone(),
302                        target_params.clone().with_resume_type(resume_type.clone()),
303                    );
304                }
305                None => {
306                    let resume_type = self
307                        .storage
308                        .read_resume_state(&target_params.target)
309                        .map_err(|err| {
310                            FirestoreError::SystemError(FirestoreSystemError::new(
311                                FirestoreErrorPublicGenericDetails::new("SystemError".into()),
312                                format!("Listener init error: {err}"),
313                            ))
314                        })
315                        .await?;
316                    initial_states.insert(
317                        target_params.target.clone(),
318                        target_params.clone().opt_resume_type(resume_type),
319                    );
320                }
321            }
322        }
323
324        if initial_states.is_empty() {
325            warn!("No initial states for listener targets. Exiting...");
326            return Ok(());
327        }
328
329        let (tx, rx): (UnboundedSender<i8>, UnboundedReceiver<i8>) =
330            tokio::sync::mpsc::unbounded_channel();
331
332        self.shutdown_writer = Some(Arc::new(tx));
333        self.shutdown_handle = Some(tokio::spawn(Self::listener_loop(
334            self.db.clone(),
335            self.storage.clone(),
336            self.shutdown_flag.clone(),
337            initial_states,
338            self.listener_params.clone(),
339            rx,
340            cb,
341        )));
342        Ok(())
343    }
344
345    pub async fn shutdown(&mut self) -> FirestoreResult<()> {
346        debug!("Shutting down Firestore listener...");
347        self.shutdown_flag.store(true, Ordering::Relaxed);
348        if let Some(shutdown_writer) = self.shutdown_writer.take() {
349            shutdown_writer.send(1).ok();
350        }
351        if let Some(signaller) = self.shutdown_handle.take() {
352            if let Err(err) = signaller.await {
353                warn!(%err, "Firestore listener exit error!");
354            };
355        }
356        debug!("Shutting down Firestore listener has been finished...");
357        Ok(())
358    }
359
360    async fn listener_loop<FN, F>(
361        db: D,
362        storage: S,
363        shutdown_flag: Arc<AtomicBool>,
364        mut targets_state: HashMap<FirestoreListenerTarget, FirestoreListenerTargetParams>,
365        listener_params: FirestoreListenerParams,
366        mut shutdown_receiver: UnboundedReceiver<i8>,
367        cb: FN,
368    ) where
369        D: FirestoreListenSupport + Clone + Send + Sync,
370        FN: Fn(FirestoreListenEvent) -> F + Send + Sync,
371        F: Future<Output = AnyBoxedErrResult<()>> + Send,
372    {
373        let effective_delay = listener_params
374            .retry_delay
375            .unwrap_or_else(|| std::time::Duration::from_secs(5));
376
377        while !shutdown_flag.load(Ordering::Relaxed) {
378            debug!(
379                num_targets = targets_state.len(),
380                "Start listening on targets..."
381            );
382
383            match db
384                .listen_doc_changes(targets_state.values().cloned().collect())
385                .await
386            {
387                Err(err) => {
388                    if Self::check_listener_if_permanent_error(err, effective_delay).await {
389                        shutdown_flag.store(true, Ordering::Relaxed);
390                    }
391                }
392                Ok(mut listen_stream) => loop {
393                    tokio::select! {
394                        shutdown_trigger = shutdown_receiver.recv() => {
395                            if shutdown_trigger.is_none() {
396                                debug!("Listener dropped. Exiting...");
397                                shutdown_flag.store(true, Ordering::Relaxed);
398                            }
399                            debug!(num_targets = targets_state.len(), "Exiting from listener on targets...");
400                            shutdown_receiver.close();
401                            break;
402                        }
403                        tried = listen_stream.try_next() => {
404                            if shutdown_flag.load(Ordering::Relaxed) {
405                                break;
406                            }
407                            else {
408                                match tried {
409                                    Ok(Some(event)) => {
410                                        trace!(?event, "Received a listen response event to handle.");
411
412                                        match event.response_type {
413                                            Some(listen_response::ResponseType::TargetChange(ref target_change))
414                                                if !target_change.resume_token.is_empty() =>
415                                            {
416                                                for target_id_num in &target_change.target_ids {
417                                                    match FirestoreListenerTarget::try_from(*target_id_num) {
418                                                        Ok(target_id) => {
419                                                            if let Some(target) = targets_state.get_mut(&target_id) {
420                                                                let new_token: FirestoreListenerToken = target_change.resume_token.clone().into();
421
422                                                                if let Err(err) = storage.update_resume_token(&target.target, new_token.clone()).await {
423                                                                    error!(%err, "Listener token storage error occurred.");
424                                                                    break;
425                                                                }
426                                                                else {
427                                                                    target.resume_type = Some(FirestoreListenerTargetResumeType::Token(new_token))
428                                                                }
429                                                            }
430                                                        },
431                                                        Err(err) => {
432                                                            error!(%err, target_id_num, "Listener system error - unexpected target ID.");
433                                                            break;
434                                                        }
435                                                    }
436                                                }
437
438                                            }
439                                            Some(response_type) => {
440                                                if let Err(err) = cb(response_type).await {
441                                                    error!(%err, "Listener callback function error occurred.");
442                                                    break;
443                                                }
444                                            }
445                                            None  =>  {}
446                                        }
447                                    }
448                                    Ok(None) => break,
449                                    Err(err) => {
450                                        if Self::check_listener_if_permanent_error(err, effective_delay).await {
451                                            shutdown_flag.store(true, Ordering::Relaxed);
452                                        }
453                                        break;
454                                    }
455                                }
456                            }
457                        }
458                    }
459                },
460            }
461        }
462    }
463
464    async fn check_listener_if_permanent_error(
465        err: FirestoreError,
466        delay: std::time::Duration,
467    ) -> bool {
468        match err {
469            FirestoreError::DatabaseError(ref db_err)
470                if db_err.details.contains("unexpected end of file")
471                    || db_err.details.contains("stream error received") =>
472            {
473                debug!(%err, ?delay, "Listen EOF.. Restarting after the specified delay...");
474                tokio::time::sleep(delay).await;
475                false
476            }
477            FirestoreError::DatabaseError(ref db_err)
478                if db_err.public.code.contains("InvalidArgument") =>
479            {
480                error!(%err, "Listen error. Exiting...");
481                true
482            }
483            FirestoreError::InvalidParametersError(_) => {
484                error!(%err, "Listen error. Exiting...");
485                true
486            }
487            _ => {
488                error!(%err, ?delay, "Listen error. Restarting after the specified delay...");
489                tokio::time::sleep(delay).await;
490                false
491            }
492        }
493    }
494}