1use crate::db::safe_document_path;
2use crate::errors::*;
3use crate::timestamp_utils::to_timestamp;
4use crate::{FirestoreDb, FirestoreQueryParams, FirestoreResult, FirestoreResumeStateStorage};
5pub use async_trait::async_trait;
6use chrono::prelude::*;
7use futures::stream::BoxStream;
8use futures::StreamExt;
9use futures::TryFutureExt;
10use futures::TryStreamExt;
11use gcloud_sdk::google::firestore::v1::*;
12use rsb_derive::*;
13pub use rvstruct::ValueStruct;
14use std::collections::HashMap;
15use std::future::Future;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
19use tokio::task::JoinHandle;
20use tracing::*;
21
22#[derive(Debug, Clone, Builder)]
23pub struct FirestoreListenerTargetParams {
24 pub target: FirestoreListenerTarget,
25 pub target_type: FirestoreTargetType,
26 pub resume_type: Option<FirestoreListenerTargetResumeType>,
27 pub add_target_once: Option<bool>,
28 pub labels: HashMap<String, String>,
29}
30
31impl FirestoreListenerTargetParams {
32 pub fn validate(&self) -> FirestoreResult<()> {
33 self.target.validate()?;
34 Ok(())
35 }
36}
37
38#[derive(Debug, Clone, Builder)]
39pub struct FirestoreCollectionDocuments {
40 pub parent: Option<String>,
41 pub collection: String,
42 pub documents: Vec<String>,
43}
44
45#[allow(clippy::large_enum_variant)]
46#[derive(Debug, Clone)]
47pub enum FirestoreTargetType {
48 Query(FirestoreQueryParams),
49 Documents(FirestoreCollectionDocuments),
50}
51
52#[derive(Debug, Clone)]
53pub enum FirestoreListenerTargetResumeType {
54 Token(FirestoreListenerToken),
55 ReadTime(DateTime<Utc>),
56}
57
58#[async_trait]
59pub trait FirestoreListenSupport {
60 async fn listen_doc_changes<'a, 'b>(
61 &'a self,
62 targets: Vec<FirestoreListenerTargetParams>,
63 ) -> FirestoreResult<BoxStream<'b, FirestoreResult<ListenResponse>>>;
64}
65
66#[async_trait]
67impl FirestoreListenSupport for FirestoreDb {
68 async fn listen_doc_changes<'a, 'b>(
69 &'a self,
70 targets: Vec<FirestoreListenerTargetParams>,
71 ) -> FirestoreResult<BoxStream<'b, FirestoreResult<ListenResponse>>> {
72 let listen_requests = targets
73 .into_iter()
74 .map(|target_params| self.create_listen_request(target_params))
75 .collect::<FirestoreResult<Vec<ListenRequest>>>()?;
76
77 let request = gcloud_sdk::tonic::Request::new(
78 futures::stream::iter(listen_requests).chain(futures::stream::pending()),
79 );
80
81 let response = self.client().get().listen(request).await?;
82
83 Ok(response.into_inner().map_err(|e| e.into()).boxed())
84 }
85}
86
87#[derive(Clone, Debug, Eq, PartialEq, Hash, ValueStruct)]
88pub struct FirestoreListenerTarget(u32);
89
90impl FirestoreListenerTarget {
91 pub fn validate(&self) -> FirestoreResult<()> {
92 if *self.value() == 0 {
93 Err(FirestoreError::InvalidParametersError(
94 FirestoreInvalidParametersError::new(FirestoreInvalidParametersPublicDetails::new(
95 "target_id".to_string(),
96 "Listener target ID cannot be zero".to_string(),
97 )),
98 ))
99 } else if *self.value() > i32::MAX as u32 {
100 Err(FirestoreError::InvalidParametersError(
101 FirestoreInvalidParametersError::new(FirestoreInvalidParametersPublicDetails::new(
102 "target_id".to_string(),
103 format!(
104 "Listener target ID cannot be more than: {}. {} is specified",
105 i32::MAX,
106 self.value()
107 ),
108 )),
109 ))
110 } else {
111 Ok(())
112 }
113 }
114}
115
116impl TryInto<i32> for FirestoreListenerTarget {
117 type Error = FirestoreError;
118
119 fn try_into(self) -> FirestoreResult<i32> {
120 self.validate()?;
121 (*self.value()).try_into().map_err(|e| {
122 FirestoreError::InvalidParametersError(FirestoreInvalidParametersError::new(
123 FirestoreInvalidParametersPublicDetails::new(
124 "target_id".to_string(),
125 format!("Invalid target ID: {} {}", self.value(), e),
126 ),
127 ))
128 })
129 }
130}
131
132impl TryFrom<i32> for FirestoreListenerTarget {
133 type Error = FirestoreError;
134
135 fn try_from(value: i32) -> FirestoreResult<Self> {
136 value
137 .try_into()
138 .map_err(|e| {
139 FirestoreError::InvalidParametersError(FirestoreInvalidParametersError::new(
140 FirestoreInvalidParametersPublicDetails::new(
141 "target_id".to_string(),
142 format!("Invalid target ID: {value} {e}"),
143 ),
144 ))
145 })
146 .map(FirestoreListenerTarget)
147 }
148}
149
150#[derive(Clone, Debug, ValueStruct)]
151pub struct FirestoreListenerToken(Vec<u8>);
152
153impl FirestoreDb {
154 pub async fn create_listener<S>(
155 &self,
156 storage: S,
157 ) -> FirestoreResult<FirestoreListener<FirestoreDb, S>>
158 where
159 S: FirestoreResumeStateStorage + Clone + Send + Sync + 'static,
160 {
161 self.create_listener_with_params(storage, FirestoreListenerParams::new())
162 .await
163 }
164
165 pub async fn create_listener_with_params<S>(
166 &self,
167 storage: S,
168 params: FirestoreListenerParams,
169 ) -> FirestoreResult<FirestoreListener<FirestoreDb, S>>
170 where
171 S: FirestoreResumeStateStorage + Clone + Send + Sync + 'static,
172 {
173 FirestoreListener::new(self.clone(), storage, params).await
174 }
175
176 fn create_listen_request(
177 &self,
178 target_params: FirestoreListenerTargetParams,
179 ) -> FirestoreResult<ListenRequest> {
180 Ok(ListenRequest {
181 database: self.get_database_path().to_string(),
182 labels: target_params.labels,
183 target_change: Some(listen_request::TargetChange::AddTarget(Target {
184 target_id: target_params.target.try_into()?,
185 once: target_params.add_target_once.unwrap_or(false),
186 target_type: Some(match target_params.target_type {
187 FirestoreTargetType::Query(query_params) => {
188 target::TargetType::Query(target::QueryTarget {
189 parent: query_params
190 .parent
191 .as_ref()
192 .unwrap_or_else(|| self.get_documents_path())
193 .clone(),
194 query_type: Some(target::query_target::QueryType::StructuredQuery(
195 query_params.try_into()?,
196 )),
197 })
198 }
199 FirestoreTargetType::Documents(collection_documents) => {
200 target::TargetType::Documents(target::DocumentsTarget {
201 documents: collection_documents
202 .documents
203 .into_iter()
204 .map(|doc_id| {
205 safe_document_path(
206 collection_documents
207 .parent
208 .as_deref()
209 .unwrap_or_else(|| self.get_documents_path()),
210 collection_documents.collection.as_str(),
211 doc_id,
212 )
213 })
214 .collect::<FirestoreResult<Vec<String>>>()?,
215 })
216 }
217 }),
218 resume_type: target_params
219 .resume_type
220 .map(|resume_type| match resume_type {
221 FirestoreListenerTargetResumeType::Token(token) => {
222 target::ResumeType::ResumeToken(token.into_value())
223 }
224 FirestoreListenerTargetResumeType::ReadTime(dt) => {
225 target::ResumeType::ReadTime(to_timestamp(dt))
226 }
227 }),
228 ..Default::default()
229 })),
230 })
231 }
232}
233
234pub type FirestoreListenEvent = listen_response::ResponseType;
235
236#[derive(Debug, Clone, Eq, PartialEq, Builder)]
237pub struct FirestoreListenerParams {
238 pub retry_delay: Option<std::time::Duration>,
239}
240
241pub struct FirestoreListener<D, S>
242where
243 D: FirestoreListenSupport,
244 S: FirestoreResumeStateStorage,
245{
246 db: D,
247 storage: S,
248 listener_params: FirestoreListenerParams,
249 targets: Vec<FirestoreListenerTargetParams>,
250 shutdown_flag: Arc<AtomicBool>,
251 shutdown_handle: Option<JoinHandle<()>>,
252 shutdown_writer: Option<Arc<UnboundedSender<i8>>>,
253}
254
255impl<D, S> FirestoreListener<D, S>
256where
257 D: FirestoreListenSupport + Clone + Send + Sync + 'static,
258 S: FirestoreResumeStateStorage + Clone + Send + Sync + 'static,
259{
260 pub async fn new(
261 db: D,
262 storage: S,
263 listener_params: FirestoreListenerParams,
264 ) -> FirestoreResult<FirestoreListener<D, S>> {
265 Ok(FirestoreListener {
266 db,
267 storage,
268 listener_params,
269 targets: vec![],
270 shutdown_flag: Arc::new(AtomicBool::new(false)),
271 shutdown_handle: None,
272 shutdown_writer: None,
273 })
274 }
275
276 pub fn add_target(
277 &mut self,
278 target_params: FirestoreListenerTargetParams,
279 ) -> FirestoreResult<()> {
280 target_params.validate()?;
281 self.targets.push(target_params);
282 Ok(())
283 }
284
285 pub async fn start<FN, F>(&mut self, cb: FN) -> FirestoreResult<()>
286 where
287 FN: Fn(FirestoreListenEvent) -> F + Send + Sync + 'static,
288 F: Future<Output = AnyBoxedErrResult<()>> + Send + 'static,
289 {
290 info!(
291 num_targets = self.targets.len(),
292 "Starting a Firestore listener for targets...",
293 );
294
295 let mut initial_states: HashMap<FirestoreListenerTarget, FirestoreListenerTargetParams> =
296 HashMap::new();
297 for target_params in &self.targets {
298 match &target_params.resume_type {
299 Some(resume_type) => {
300 initial_states.insert(
301 target_params.target.clone(),
302 target_params.clone().with_resume_type(resume_type.clone()),
303 );
304 }
305 None => {
306 let resume_type = self
307 .storage
308 .read_resume_state(&target_params.target)
309 .map_err(|err| {
310 FirestoreError::SystemError(FirestoreSystemError::new(
311 FirestoreErrorPublicGenericDetails::new("SystemError".into()),
312 format!("Listener init error: {err}"),
313 ))
314 })
315 .await?;
316 initial_states.insert(
317 target_params.target.clone(),
318 target_params.clone().opt_resume_type(resume_type),
319 );
320 }
321 }
322 }
323
324 if initial_states.is_empty() {
325 warn!("No initial states for listener targets. Exiting...");
326 return Ok(());
327 }
328
329 let (tx, rx): (UnboundedSender<i8>, UnboundedReceiver<i8>) =
330 tokio::sync::mpsc::unbounded_channel();
331
332 self.shutdown_writer = Some(Arc::new(tx));
333 self.shutdown_handle = Some(tokio::spawn(Self::listener_loop(
334 self.db.clone(),
335 self.storage.clone(),
336 self.shutdown_flag.clone(),
337 initial_states,
338 self.listener_params.clone(),
339 rx,
340 cb,
341 )));
342 Ok(())
343 }
344
345 pub async fn shutdown(&mut self) -> FirestoreResult<()> {
346 debug!("Shutting down Firestore listener...");
347 self.shutdown_flag.store(true, Ordering::Relaxed);
348 if let Some(shutdown_writer) = self.shutdown_writer.take() {
349 shutdown_writer.send(1).ok();
350 }
351 if let Some(signaller) = self.shutdown_handle.take() {
352 if let Err(err) = signaller.await {
353 warn!(%err, "Firestore listener exit error!");
354 };
355 }
356 debug!("Shutting down Firestore listener has been finished...");
357 Ok(())
358 }
359
360 async fn listener_loop<FN, F>(
361 db: D,
362 storage: S,
363 shutdown_flag: Arc<AtomicBool>,
364 mut targets_state: HashMap<FirestoreListenerTarget, FirestoreListenerTargetParams>,
365 listener_params: FirestoreListenerParams,
366 mut shutdown_receiver: UnboundedReceiver<i8>,
367 cb: FN,
368 ) where
369 D: FirestoreListenSupport + Clone + Send + Sync,
370 FN: Fn(FirestoreListenEvent) -> F + Send + Sync,
371 F: Future<Output = AnyBoxedErrResult<()>> + Send,
372 {
373 let effective_delay = listener_params
374 .retry_delay
375 .unwrap_or_else(|| std::time::Duration::from_secs(5));
376
377 while !shutdown_flag.load(Ordering::Relaxed) {
378 debug!(
379 num_targets = targets_state.len(),
380 "Start listening on targets..."
381 );
382
383 match db
384 .listen_doc_changes(targets_state.values().cloned().collect())
385 .await
386 {
387 Err(err) => {
388 if Self::check_listener_if_permanent_error(err, effective_delay).await {
389 shutdown_flag.store(true, Ordering::Relaxed);
390 }
391 }
392 Ok(mut listen_stream) => loop {
393 tokio::select! {
394 shutdown_trigger = shutdown_receiver.recv() => {
395 if shutdown_trigger.is_none() {
396 debug!("Listener dropped. Exiting...");
397 shutdown_flag.store(true, Ordering::Relaxed);
398 }
399 debug!(num_targets = targets_state.len(), "Exiting from listener on targets...");
400 shutdown_receiver.close();
401 break;
402 }
403 tried = listen_stream.try_next() => {
404 if shutdown_flag.load(Ordering::Relaxed) {
405 break;
406 }
407 else {
408 match tried {
409 Ok(Some(event)) => {
410 trace!(?event, "Received a listen response event to handle.");
411
412 match event.response_type {
413 Some(listen_response::ResponseType::TargetChange(ref target_change))
414 if !target_change.resume_token.is_empty() =>
415 {
416 for target_id_num in &target_change.target_ids {
417 match FirestoreListenerTarget::try_from(*target_id_num) {
418 Ok(target_id) => {
419 if let Some(target) = targets_state.get_mut(&target_id) {
420 let new_token: FirestoreListenerToken = target_change.resume_token.clone().into();
421
422 if let Err(err) = storage.update_resume_token(&target.target, new_token.clone()).await {
423 error!(%err, "Listener token storage error occurred.");
424 break;
425 }
426 else {
427 target.resume_type = Some(FirestoreListenerTargetResumeType::Token(new_token))
428 }
429 }
430 },
431 Err(err) => {
432 error!(%err, target_id_num, "Listener system error - unexpected target ID.");
433 break;
434 }
435 }
436 }
437
438 }
439 Some(response_type) => {
440 if let Err(err) = cb(response_type).await {
441 error!(%err, "Listener callback function error occurred.");
442 break;
443 }
444 }
445 None => {}
446 }
447 }
448 Ok(None) => break,
449 Err(err) => {
450 if Self::check_listener_if_permanent_error(err, effective_delay).await {
451 shutdown_flag.store(true, Ordering::Relaxed);
452 }
453 break;
454 }
455 }
456 }
457 }
458 }
459 },
460 }
461 }
462 }
463
464 async fn check_listener_if_permanent_error(
465 err: FirestoreError,
466 delay: std::time::Duration,
467 ) -> bool {
468 match err {
469 FirestoreError::DatabaseError(ref db_err)
470 if db_err.details.contains("unexpected end of file")
471 || db_err.details.contains("stream error received") =>
472 {
473 debug!(%err, ?delay, "Listen EOF.. Restarting after the specified delay...");
474 tokio::time::sleep(delay).await;
475 false
476 }
477 FirestoreError::DatabaseError(ref db_err)
478 if db_err.public.code.contains("InvalidArgument") =>
479 {
480 error!(%err, "Listen error. Exiting...");
481 true
482 }
483 FirestoreError::InvalidParametersError(_) => {
484 error!(%err, "Listen error. Exiting...");
485 true
486 }
487 _ => {
488 error!(%err, ?delay, "Listen error. Restarting after the specified delay...");
489 tokio::time::sleep(delay).await;
490 false
491 }
492 }
493 }
494}