1use std::collections::HashMap;
4use std::marker::PhantomData;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use atomr_core::actor::{ActorRef, ActorSystem, Props};
11use atomr_core::pattern::RetrySchedule;
12use atomr_persistence::{Journal, RecoveryPermitter, SnapshotPolicy, SnapshotStore};
13use atomr_persistence_query::ReadJournal;
14use futures::future::BoxFuture;
15use tokio::sync::{Mutex, RwLock};
16
17use crate::bus::BusHandles;
18use crate::cqrs::command_gateway::{CommandEnvelope, CommandGateway, SnapshotConfig};
19use crate::cqrs::event_codec::EventCodecRegistry;
20use crate::cqrs::projection::ProjectionHandle;
21use crate::cqrs::reader::{Reader, ReaderFilter};
22use crate::ddd::Repository;
23use crate::extensions::{CommandInterceptor, EventListener, ExtensionSlots};
24use crate::topology::Topology;
25use crate::{AggregateRoot, Command, DomainEvent, PatternError};
26
27pub struct CqrsPattern<A>(PhantomData<A>);
30
31impl<A> CqrsPattern<A>
32where
33 A: AggregateRoot,
34 A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
35 A::Event: DomainEvent,
36{
37 pub fn builder<J: Journal>(journal: Arc<J>) -> CqrsBuilder<A, J> {
45 CqrsBuilder::new(journal)
46 }
47}
48
49pub struct CqrsBuilder<A, J>
51where
52 A: AggregateRoot,
53 A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
54 A::Event: DomainEvent,
55 J: Journal,
56{
57 name: Option<String>,
58 factory: Option<Arc<dyn Fn(<A as AggregateRoot>::Id) -> A + Send + Sync>>,
59 journal: Arc<J>,
60 read_journal: Option<Arc<dyn ReadJournal>>,
61 recovery_permits: usize,
62 writer_uuid: String,
63 poll_interval: Duration,
64 repo_timeout: Duration,
65 extensions: ExtensionSlots<A::Command, A::Event, A::Error>,
66 readers: Vec<Box<dyn ErasedReader<A::Event>>>,
67 rebuild_contexts: Vec<RebuildContext<A::Event>>,
68 snapshot_store: Option<Arc<dyn SnapshotStore>>,
69 snapshot_policy: SnapshotPolicy,
70 snapshot_keep_last: usize,
71 shards: usize,
72 event_codecs: Option<Arc<EventCodecRegistry<A::Event>>>,
73 reader_retry: Option<(u32, RetrySchedule)>,
74 event_bus: Option<BusHandles<A::Event>>,
75 dedupe_window: usize,
76}
77
78impl<A, J> CqrsBuilder<A, J>
79where
80 A: AggregateRoot,
81 A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
82 A::Event: DomainEvent,
83 J: Journal,
84{
85 fn new(journal: Arc<J>) -> Self {
86 Self {
87 name: None,
88 factory: None,
89 journal,
90 read_journal: None,
91 recovery_permits: 8,
92 writer_uuid: format!("cqrs-{}", rand_writer_id()),
93 poll_interval: Duration::from_millis(50),
94 repo_timeout: Duration::from_secs(5),
95 extensions: ExtensionSlots::default(),
96 readers: Vec::new(),
97 rebuild_contexts: Vec::new(),
98 snapshot_store: None,
99 snapshot_policy: SnapshotPolicy::Manual,
100 snapshot_keep_last: 1,
101 shards: 1,
102 event_codecs: None,
103 reader_retry: None,
104 event_bus: None,
105 dedupe_window: 0,
106 }
107 }
108
109 pub fn dedupe_window(mut self, n: usize) -> Self {
116 self.dedupe_window = n;
117 self
118 }
119
120 pub fn with_event_codecs(mut self, registry: EventCodecRegistry<A::Event>) -> Self {
124 self.event_codecs = Some(Arc::new(registry));
125 self
126 }
127
128 pub fn with_reader_retry(mut self, max_attempts: u32, schedule: RetrySchedule) -> Self {
133 self.reader_retry = Some((max_attempts.max(1), schedule));
134 self
135 }
136
137 pub fn with_event_bus(mut self, bus: BusHandles<A::Event>) -> Self {
143 self.event_bus = Some(bus);
144 self
145 }
146
147 pub fn shards(mut self, n: usize) -> Self {
154 self.shards = n.max(1);
155 self
156 }
157
158 pub fn snapshot_store<S: SnapshotStore + ?Sized>(mut self, store: Arc<S>) -> Self
163 where
164 Arc<S>: Into<Arc<dyn SnapshotStore>>,
165 {
166 self.snapshot_store = Some(store.into());
167 self
168 }
169
170 pub fn snapshot_policy(mut self, policy: SnapshotPolicy) -> Self {
173 self.snapshot_policy = policy;
174 self
175 }
176
177 pub fn snapshot_keep_last(mut self, n: usize) -> Self {
179 self.snapshot_keep_last = n.max(1);
180 self
181 }
182
183 pub fn name(mut self, name: impl Into<String>) -> Self {
186 self.name = Some(name.into());
187 self
188 }
189
190 pub fn factory<F>(mut self, factory: F) -> Self
194 where
195 F: Fn(<A as AggregateRoot>::Id) -> A + Send + Sync + 'static,
196 {
197 self.factory = Some(Arc::new(factory));
198 self
199 }
200
201 pub fn read_journal<R: ReadJournal>(mut self, rj: Arc<R>) -> Self {
204 self.read_journal = Some(rj);
205 self
206 }
207
208 pub fn recovery_permits(mut self, n: usize) -> Self {
210 self.recovery_permits = n;
211 self
212 }
213
214 pub fn poll_interval(mut self, d: Duration) -> Self {
216 self.poll_interval = d;
217 self
218 }
219
220 pub fn repository_timeout(mut self, d: Duration) -> Self {
222 self.repo_timeout = d;
223 self
224 }
225
226 pub fn writer_uuid(mut self, w: impl Into<String>) -> Self {
228 self.writer_uuid = w.into();
229 self
230 }
231
232 pub fn on_command<F>(mut self, hook: F) -> Self
237 where
238 F: Fn(&A::Command) -> Result<(), PatternError<A::Error>> + Send + Sync + 'static,
239 {
240 let hook: CommandInterceptor<A::Command, A::Error> = Arc::new(hook);
241 self.extensions.command_interceptors.push(hook);
242 self
243 }
244
245 pub fn on_event<F>(mut self, hook: F) -> Self
249 where
250 F: Fn(&A::Event) + Send + Sync + 'static,
251 {
252 let hook: EventListener<A::Event> = Arc::new(hook);
253 self.extensions.event_listeners.push(hook);
254 self
255 }
256
257 pub fn tap_events(mut self, tx: tokio::sync::mpsc::UnboundedSender<A::Event>) -> Self {
261 self.extensions.event_taps.push(tx);
262 self
263 }
264
265 pub fn with_reader<R>(mut self, reader: R) -> (Self, ProjectionHandle<R::Projection>)
269 where
270 R: Reader<Event = A::Event>,
271 {
272 let name = reader.name().to_string();
273 let filter = reader.filter();
274 let state: Arc<RwLock<R::Projection>> = Arc::new(RwLock::new(R::Projection::default()));
275 let offset = Arc::new(AtomicU64::new(0));
276 let handle = ProjectionHandle { state: state.clone(), offset: offset.clone() };
277 let spec = ReaderSpec::<R> {
278 reader: Arc::new(Mutex::new(reader)),
279 state,
280 offset: offset.clone(),
281 name: name.clone(),
282 filter,
283 };
284 let ctx = spec.rebuild_context();
285 self.rebuild_contexts.push(ctx);
286 self.readers.push(Box::new(spec));
287 (self, handle)
288 }
289
290 pub fn build(self) -> Result<CqrsTopology<A, J>, PatternError<A::Error>> {
294 let factory = self.factory.ok_or(PatternError::NotConfigured("factory"))?;
295 if !self.readers.is_empty() && self.read_journal.is_none() && self.event_bus.is_none() {
296 return Err(PatternError::NotConfigured("read_journal"));
297 }
298 let snapshot = self.snapshot_store.map(|store| SnapshotConfig {
299 store,
300 policy: self.snapshot_policy,
301 keep_last: self.snapshot_keep_last,
302 });
303
304 let mut extensions = self.extensions;
307 if let Some(bus) = self.event_bus.as_ref() {
308 let bus_for_listener = bus.clone();
309 let listener: crate::extensions::EventListener<A::Event> =
310 Arc::new(move |e: &A::Event| bus_for_listener.publish(e.clone()));
311 extensions.event_listeners.push(listener);
312 }
313
314 Ok(CqrsTopology {
315 name: self.name.unwrap_or_else(|| "cqrs".into()),
316 factory,
317 journal: self.journal,
318 read_journal: self.read_journal,
319 recovery_permits: self.recovery_permits,
320 writer_uuid: self.writer_uuid,
321 poll_interval: self.poll_interval,
322 repo_timeout: self.repo_timeout,
323 extensions,
324 readers: self.readers,
325 rebuild_contexts: self.rebuild_contexts,
326 snapshot,
327 shards: self.shards,
328 event_codecs: self.event_codecs,
329 reader_retry: self.reader_retry,
330 event_bus: self.event_bus,
331 dedupe_window: self.dedupe_window,
332 })
333 }
334}
335
336pub struct CqrsTopology<A, J>
340where
341 A: AggregateRoot,
342 A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
343 A::Event: DomainEvent,
344 J: Journal,
345{
346 name: String,
347 factory: Arc<dyn Fn(<A as AggregateRoot>::Id) -> A + Send + Sync>,
348 journal: Arc<J>,
349 read_journal: Option<Arc<dyn ReadJournal>>,
350 recovery_permits: usize,
351 writer_uuid: String,
352 poll_interval: Duration,
353 repo_timeout: Duration,
354 extensions: ExtensionSlots<A::Command, A::Event, A::Error>,
355 readers: Vec<Box<dyn ErasedReader<A::Event>>>,
356 rebuild_contexts: Vec<RebuildContext<A::Event>>,
357 snapshot: Option<SnapshotConfig>,
358 shards: usize,
359 event_codecs: Option<Arc<EventCodecRegistry<A::Event>>>,
360 reader_retry: Option<(u32, RetrySchedule)>,
361 event_bus: Option<BusHandles<A::Event>>,
362 dedupe_window: usize,
363}
364
365#[async_trait]
366impl<A, J> Topology for CqrsTopology<A, J>
367where
368 A: AggregateRoot,
369 A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
370 A::Event: DomainEvent,
371 J: Journal,
372{
373 type Handles = CqrsHandles<A>;
374
375 async fn materialize(self, system: &ActorSystem) -> Result<Self::Handles, PatternError<()>> {
376 let factory = self.factory.clone();
380 let journal = self.journal.clone();
381 let permits = Arc::new(RecoveryPermitter::new(self.recovery_permits));
382 let writer_uuid = self.writer_uuid.clone();
383 let extensions = self.extensions.clone();
384 let snapshot = self.snapshot.clone();
385 let shards = self.shards.max(1);
386 let dedupe_window = self.dedupe_window;
387
388 let mut gateways: Vec<ActorRef<CommandEnvelope<A>>> = Vec::with_capacity(shards);
389 for shard_idx in 0..shards {
390 let factory = factory.clone();
391 let journal = journal.clone();
392 let permits = permits.clone();
393 let writer_uuid = writer_uuid.clone();
394 let extensions = extensions.clone();
395 let snapshot = snapshot.clone();
396 let actor_name =
397 if shards == 1 { self.name.clone() } else { format!("{}-shard-{shard_idx}", self.name) };
398 let actor_ref = system
399 .actor_of(
400 Props::create(move || CommandGateway::<A, J> {
401 factory: factory.clone(),
402 journal: journal.clone(),
403 permits: permits.clone(),
404 writer_uuid: writer_uuid.clone(),
405 entities: HashMap::new(),
406 extensions: extensions.clone(),
407 snapshot: snapshot.clone(),
408 dedupe_window,
409 }),
410 &actor_name,
411 )
412 .map_err(|e| PatternError::Invariant(format!("spawn gateway: {e}")))?;
413 gateways.push(actor_ref);
414 }
415
416 let read_journal = self.read_journal.clone();
419 let bus = self.event_bus.clone();
423 let codecs = self.event_codecs.clone();
424 let retry_cfg = self.reader_retry;
425 if !self.readers.is_empty() {
426 let interval = self.poll_interval;
427 if let Some(bus_handles) = bus {
428 for spec in self.readers {
429 let rx = bus_handles.subscribe();
430 tokio::spawn(run_reader_live(spec, rx, retry_cfg));
431 }
432 } else if let Some(rj) = read_journal.clone() {
433 for spec in self.readers {
434 let codecs = codecs.clone();
435 tokio::spawn(run_reader_poll(spec, rj.clone(), interval, codecs, retry_cfg));
436 }
437 } else {
438 return Err(PatternError::Invariant(
441 "readers configured without an event bus or a read journal".into(),
442 ));
443 }
444 }
445
446 let repo: Arc<dyn Repository<Aggregate = A>> =
447 Arc::new(ShardedRepository::<A> { gateways, timeout: self.repo_timeout });
448
449 let mut rebuilds: HashMap<String, RebuildFn> = HashMap::new();
453 let rebuild_journal = read_journal.clone();
454 let rebuild_codecs = self.event_codecs.clone();
455 for ctx in self.rebuild_contexts {
456 let journal = rebuild_journal.clone();
457 let codecs = rebuild_codecs.clone();
458 let name = ctx.name.clone();
459 let f: RebuildFn = Arc::new(move || {
460 let ctx = RebuildContext {
461 name: ctx.name.clone(),
462 state_reset: ctx.state_reset.clone(),
463 apply: ctx.apply.clone(),
464 filter: ctx.filter.clone(),
465 offset: ctx.offset.clone(),
466 };
467 let journal = journal.clone();
468 let codecs = codecs.clone();
469 Box::pin(async move {
470 let Some(rj) = journal else {
471 return Err("rebuild_projection requires a read_journal".into());
472 };
473 rebuild_one_projection(ctx, rj, codecs).await
474 })
475 });
476 rebuilds.insert(name, f);
477 }
478
479 Ok(CqrsHandles { repository: repo, rebuilds })
480 }
481}
482
483async fn rebuild_one_projection<E: Send + Clone + 'static>(
484 ctx: RebuildContext<E>,
485 rj: Arc<dyn ReadJournal>,
486 codecs: Option<Arc<EventCodecRegistry<E>>>,
487) -> Result<(), String> {
488 (ctx.state_reset)().await;
489 let pids = match &ctx.filter {
490 ReaderFilter::All | ReaderFilter::Tag(_) => {
491 rj.all_persistence_ids().await.map_err(|e| format!("list pids: {e:?}"))?
492 }
493 ReaderFilter::PersistenceId(id) => vec![id.clone()],
494 ReaderFilter::PersistenceIds(ids) => ids.clone(),
495 };
496 let mut max_seq: u64 = 0;
497 for pid in pids {
498 let events = rj
499 .events_by_persistence_id(&pid, 1, u64::MAX)
500 .await
501 .map_err(|e| format!("read pid {pid}: {e:?}"))?;
502 for env in events {
503 if let ReaderFilter::Tag(t) = &ctx.filter {
504 if !env.tags.iter().any(|x| x == t) {
505 continue;
506 }
507 }
508 let decoded =
509 codecs.as_ref().and_then(|r| r.decode(&env.manifest, &env.payload)).ok_or_else(|| {
510 format!("no decoder for manifest `{}` (configure EventCodecRegistry)", env.manifest)
511 })?;
512 let event = decoded?;
513 (ctx.apply)(event).await.map_err(|e| format!("apply during rebuild: {e}"))?;
514 if env.sequence_nr > max_seq {
515 max_seq = env.sequence_nr;
516 }
517 }
518 }
519 ctx.offset.store(max_seq, Ordering::Release);
520 Ok(())
521}
522
523pub struct CqrsHandles<A>
525where
526 A: AggregateRoot,
527 A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
528 A::Event: DomainEvent,
529{
530 repository: Arc<dyn Repository<Aggregate = A>>,
531 rebuilds: HashMap<String, RebuildFn>,
532}
533
534type RebuildFn = Arc<dyn Fn() -> BoxFuture<'static, Result<(), String>> + Send + Sync + 'static>;
535
536impl<A> CqrsHandles<A>
537where
538 A: AggregateRoot,
539 A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
540 A::Event: DomainEvent,
541{
542 pub fn repository(&self) -> Arc<dyn Repository<Aggregate = A>> {
544 self.repository.clone()
545 }
546
547 pub async fn rebuild_projection(&self, name: &str) -> Result<(), String> {
553 let f = self.rebuilds.get(name).ok_or_else(|| format!("no reader named `{name}`"))?.clone();
554 f().await
555 }
556}
557
558struct ShardedRepository<A>
561where
562 A: AggregateRoot,
563 A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
564 A::Event: DomainEvent,
565{
566 gateways: Vec<ActorRef<CommandEnvelope<A>>>,
567 timeout: Duration,
568}
569
570#[async_trait]
571impl<A> Repository for ShardedRepository<A>
572where
573 A: AggregateRoot,
574 A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
575 A::Event: DomainEvent,
576{
577 type Aggregate = A;
578
579 async fn send(&self, cmd: A::Command) -> Result<Vec<A::Event>, PatternError<A::Error>> {
580 let id = cmd.aggregate_id();
581 let idx = shard_index(&id, self.gateways.len());
582 match self.gateways[idx].ask_with(|reply| CommandEnvelope { cmd, reply }, self.timeout).await {
583 Ok(inner) => inner,
584 Err(ask) => Err(PatternError::Ask(ask)),
585 }
586 }
587}
588
589fn shard_index<I: std::hash::Hash>(id: &I, n: usize) -> usize {
590 use std::hash::Hasher;
591 let mut h = std::collections::hash_map::DefaultHasher::new();
592 id.hash(&mut h);
593 (h.finish() as usize) % n.max(1)
594}
595
596trait ErasedReader<E>: Send + Sync + 'static {
597 fn name(&self) -> String;
598 fn filter(&self) -> ReaderFilter;
599 fn offset(&self) -> Arc<AtomicU64>;
600 fn decode_payload(&self, bytes: &[u8]) -> Result<E, String>;
601 fn apply<'a>(&'a self, event: E) -> BoxFuture<'a, Result<(), String>>;
602}
603
604struct ReaderSpec<R: Reader> {
605 reader: Arc<Mutex<R>>,
606 state: Arc<RwLock<R::Projection>>,
607 offset: Arc<AtomicU64>,
608 name: String,
609 filter: ReaderFilter,
610}
611
612impl<R: Reader> ErasedReader<R::Event> for ReaderSpec<R> {
613 fn name(&self) -> String {
614 self.name.clone()
615 }
616 fn filter(&self) -> ReaderFilter {
617 self.filter.clone()
618 }
619 fn offset(&self) -> Arc<AtomicU64> {
620 self.offset.clone()
621 }
622 fn decode_payload(&self, bytes: &[u8]) -> Result<R::Event, String> {
623 R::decode(bytes)
624 }
625 fn apply<'a>(&'a self, event: R::Event) -> BoxFuture<'a, Result<(), String>> {
626 let state = self.state.clone();
627 let reader = self.reader.clone();
628 Box::pin(async move {
629 let mut state = state.write().await;
630 let mut reader = reader.lock().await;
631 reader.apply(&mut *state, event).await.map_err(|e| e.to_string())
632 })
633 }
634}
635
636type ResetFn = Arc<dyn Fn() -> BoxFuture<'static, ()> + Send + Sync>;
637type ApplyFn<E> = Arc<dyn Fn(E) -> BoxFuture<'static, Result<(), String>> + Send + Sync>;
638
639struct RebuildContext<E: Send + Clone + 'static> {
643 name: String,
644 state_reset: ResetFn,
645 apply: ApplyFn<E>,
646 filter: ReaderFilter,
647 offset: Arc<AtomicU64>,
648}
649
650impl<R: Reader> ReaderSpec<R> {
651 fn rebuild_context(&self) -> RebuildContext<R::Event> {
652 let state = self.state.clone();
653 let offset = self.offset.clone();
654 let reader = self.reader.clone();
655 let state_clone = state.clone();
656 let offset_clone = offset.clone();
657 let reader_clone = reader.clone();
658 let state_reset: ResetFn = Arc::new(move || {
659 let state = state_clone.clone();
660 let offset = offset_clone.clone();
661 Box::pin(async move {
662 *state.write().await = R::Projection::default();
663 offset.store(0, Ordering::Release);
664 })
665 });
666 let apply: ApplyFn<R::Event> = Arc::new(move |event: R::Event| {
667 let state = state.clone();
668 let reader = reader_clone.clone();
669 Box::pin(async move {
670 let mut state = state.write().await;
671 let mut reader = reader.lock().await;
672 reader.apply(&mut *state, event).await.map_err(|e| e.to_string())
673 })
674 });
675 RebuildContext { name: self.name.clone(), state_reset, apply, filter: self.filter.clone(), offset }
676 }
677}
678
679async fn run_reader_poll<E: Send + Clone + 'static>(
680 reader: Box<dyn ErasedReader<E>>,
681 read_journal: Arc<dyn ReadJournal>,
682 poll_interval: Duration,
683 codecs: Option<Arc<EventCodecRegistry<E>>>,
684 retry: Option<(u32, RetrySchedule)>,
685) {
686 let mut pid_offsets: HashMap<String, u64> = HashMap::new();
687 let offset_handle = reader.offset();
688 let filter = reader.filter();
689 let name = reader.name();
690
691 loop {
692 let pids = match resolve_pids(&filter, &*read_journal).await {
693 Ok(p) => p,
694 Err(e) => {
695 tracing::warn!(reader = %name, error = ?e, "list pids failed");
696 tokio::time::sleep(poll_interval).await;
697 continue;
698 }
699 };
700
701 let mut max_seq_seen = offset_handle.load(Ordering::Acquire);
702
703 for pid in pids {
704 let from = pid_offsets.get(&pid).copied().unwrap_or(0).saturating_add(1);
705 let events = match read_journal.events_by_persistence_id(&pid, from, u64::MAX).await {
706 Ok(e) => e,
707 Err(e) => {
708 tracing::warn!(reader = %name, pid = %pid, error = ?e, "read failed");
709 continue;
710 }
711 };
712
713 for env in events {
714 if let ReaderFilter::Tag(t) = &filter {
715 if !env.tags.iter().any(|x| x == t) {
716 pid_offsets.insert(pid.clone(), env.sequence_nr);
717 continue;
718 }
719 }
720
721 let decoded = codecs
722 .as_ref()
723 .and_then(|r| r.decode(&env.manifest, &env.payload))
724 .unwrap_or_else(|| reader.decode_payload(&env.payload));
725
726 match decoded {
727 Ok(event) => {
728 apply_with_retry(&*reader, event, retry, &name).await;
729 pid_offsets.insert(pid.clone(), env.sequence_nr);
730 if env.sequence_nr > max_seq_seen {
731 max_seq_seen = env.sequence_nr;
732 }
733 }
734 Err(s) => {
735 tracing::warn!(reader = %name, error = %s, "decode failed");
736 pid_offsets.insert(pid.clone(), env.sequence_nr);
737 }
738 }
739 }
740 }
741
742 offset_handle.store(max_seq_seen, Ordering::Release);
743 tokio::time::sleep(poll_interval).await;
744 }
745}
746
747async fn run_reader_live<E: Send + Clone + 'static>(
748 reader: Box<dyn ErasedReader<E>>,
749 mut rx: tokio::sync::mpsc::UnboundedReceiver<E>,
750 retry: Option<(u32, RetrySchedule)>,
751) {
752 let name = reader.name();
753 while let Some(event) = rx.recv().await {
754 apply_with_retry(&*reader, event, retry, &name).await;
755 }
756}
757
758async fn apply_with_retry<E: Send + Clone + 'static>(
759 reader: &dyn ErasedReader<E>,
760 event: E,
761 retry: Option<(u32, RetrySchedule)>,
762 name: &str,
763) {
764 let result = if let Some((max_attempts, sched)) = retry {
765 let mut last: Option<String> = None;
766 for attempt in 0..max_attempts {
767 match reader.apply(event.clone()).await {
768 Ok(()) => return,
769 Err(e) => {
770 last = Some(e);
771 if attempt + 1 < max_attempts {
772 tokio::time::sleep(sched.delay_for(attempt)).await;
773 }
774 }
775 }
776 }
777 Err(last.unwrap_or_else(|| "unknown".into()))
778 } else {
779 reader.apply(event).await
780 };
781 if let Err(err) = result {
782 tracing::warn!(reader = %name, error = %err, "apply failed (retries exhausted)");
783 }
784}
785
786async fn resolve_pids(
787 filter: &ReaderFilter,
788 rj: &dyn ReadJournal,
789) -> Result<Vec<String>, atomr_persistence::JournalError> {
790 match filter {
791 ReaderFilter::All | ReaderFilter::Tag(_) => rj.all_persistence_ids().await,
792 ReaderFilter::PersistenceId(id) => Ok(vec![id.clone()]),
793 ReaderFilter::PersistenceIds(ids) => Ok(ids.clone()),
794 }
795}
796
797fn rand_writer_id() -> String {
798 use std::time::{SystemTime, UNIX_EPOCH};
799 let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0);
800 format!("{nanos:x}")
801}