Skip to main content

atomr_patterns/cqrs/
builder.rs

1//! [`CqrsPattern`], [`CqrsBuilder`], [`CqrsTopology`], [`CqrsHandles`].
2
3use std::collections::HashMap;
4use std::marker::PhantomData;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use atomr_core::actor::{ActorRef, ActorSystem, Props};
11use atomr_core::pattern::RetrySchedule;
12use atomr_persistence::{Journal, RecoveryPermitter, SnapshotPolicy, SnapshotStore};
13use atomr_persistence_query::ReadJournal;
14use futures::future::BoxFuture;
15use tokio::sync::{Mutex, RwLock};
16
17use crate::bus::BusHandles;
18use crate::cqrs::command_gateway::{CommandEnvelope, CommandGateway, SnapshotConfig};
19use crate::cqrs::event_codec::EventCodecRegistry;
20use crate::cqrs::projection::ProjectionHandle;
21use crate::cqrs::reader::{Reader, ReaderFilter};
22use crate::ddd::Repository;
23use crate::extensions::{CommandInterceptor, EventListener, ExtensionSlots};
24use crate::topology::Topology;
25use crate::{AggregateRoot, Command, DomainEvent, PatternError};
26
27/// Public, zero-sized handle to the CQRS pattern. Use
28/// [`CqrsPattern::builder`] to start configuring an instance.
29pub struct CqrsPattern<A>(PhantomData<A>);
30
31impl<A> CqrsPattern<A>
32where
33    A: AggregateRoot,
34    A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
35    A::Event: DomainEvent,
36{
37    /// Start a fluent builder for a CQRS instance backed by `journal`.
38    ///
39    /// The journal type `J` flows into the rest of the builder, so the
40    /// rest of the configuration is type-checked. Call
41    /// [`CqrsBuilder::factory`], [`CqrsBuilder::read_journal`] (if you
42    /// want readers), and [`CqrsBuilder::build`] to obtain a
43    /// [`CqrsTopology`].
44    pub fn builder<J: Journal>(journal: Arc<J>) -> CqrsBuilder<A, J> {
45        CqrsBuilder::new(journal)
46    }
47}
48
49/// Fluent builder for a CQRS instance.
50pub struct CqrsBuilder<A, J>
51where
52    A: AggregateRoot,
53    A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
54    A::Event: DomainEvent,
55    J: Journal,
56{
57    name: Option<String>,
58    factory: Option<Arc<dyn Fn(<A as AggregateRoot>::Id) -> A + Send + Sync>>,
59    journal: Arc<J>,
60    read_journal: Option<Arc<dyn ReadJournal>>,
61    recovery_permits: usize,
62    writer_uuid: String,
63    poll_interval: Duration,
64    repo_timeout: Duration,
65    extensions: ExtensionSlots<A::Command, A::Event, A::Error>,
66    readers: Vec<Box<dyn ErasedReader<A::Event>>>,
67    rebuild_contexts: Vec<RebuildContext<A::Event>>,
68    snapshot_store: Option<Arc<dyn SnapshotStore>>,
69    snapshot_policy: SnapshotPolicy,
70    snapshot_keep_last: usize,
71    shards: usize,
72    event_codecs: Option<Arc<EventCodecRegistry<A::Event>>>,
73    reader_retry: Option<(u32, RetrySchedule)>,
74    event_bus: Option<BusHandles<A::Event>>,
75    dedupe_window: usize,
76}
77
78impl<A, J> CqrsBuilder<A, J>
79where
80    A: AggregateRoot,
81    A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
82    A::Event: DomainEvent,
83    J: Journal,
84{
85    fn new(journal: Arc<J>) -> Self {
86        Self {
87            name: None,
88            factory: None,
89            journal,
90            read_journal: None,
91            recovery_permits: 8,
92            writer_uuid: format!("cqrs-{}", rand_writer_id()),
93            poll_interval: Duration::from_millis(50),
94            repo_timeout: Duration::from_secs(5),
95            extensions: ExtensionSlots::default(),
96            readers: Vec::new(),
97            rebuild_contexts: Vec::new(),
98            snapshot_store: None,
99            snapshot_policy: SnapshotPolicy::Manual,
100            snapshot_keep_last: 1,
101            shards: 1,
102            event_codecs: None,
103            reader_retry: None,
104            event_bus: None,
105            dedupe_window: 0,
106        }
107    }
108
109    /// Cap on the per-aggregate command-id dedupe ring. `0` (default)
110    /// disables dedupe — every command runs through the handler.
111    /// Non-zero enables idempotent retries: commands carrying a
112    /// previously-seen [`crate::Command::command_id`] return the
113    /// cached events without re-running the handler. v2 caches
114    /// successes only; failed commands always re-execute.
115    pub fn dedupe_window(mut self, n: usize) -> Self {
116        self.dedupe_window = n;
117        self
118    }
119
120    /// Provide an [`EventCodecRegistry`] that decodes events based on
121    /// their journal manifest. Lets you evolve event schemas without
122    /// rewriting old events.
123    pub fn with_event_codecs(mut self, registry: EventCodecRegistry<A::Event>) -> Self {
124        self.event_codecs = Some(Arc::new(registry));
125        self
126    }
127
128    /// Reader runners retry transient `apply` failures up to
129    /// `max_attempts` times with the given backoff schedule.
130    /// Default: no retry — failures are logged and the offset
131    /// advances.
132    pub fn with_reader_retry(mut self, max_attempts: u32, schedule: RetrySchedule) -> Self {
133        self.reader_retry = Some((max_attempts.max(1), schedule));
134        self
135    }
136
137    /// Wire a [`crate::bus::DomainEventBus`] into the gateway.
138    /// Persisted events are published to the bus on success, and
139    /// readers subscribe to the bus for live-tail delivery instead of
140    /// polling. Lower latency than polling at the cost of in-process
141    /// coupling to the bus's lifetime.
142    pub fn with_event_bus(mut self, bus: BusHandles<A::Event>) -> Self {
143        self.event_bus = Some(bus);
144        self
145    }
146
147    /// Spawn `n` parallel command-gateway actors and route commands
148    /// across them by hashing [`crate::Command::aggregate_id`]. Per-id
149    /// FIFO ordering is preserved — every command for the same id
150    /// reaches the same gateway. v2 supports intra-process sharding
151    /// only; cross-node distribution via `atomr-cluster-sharding` is
152    /// a v3 follow-on.
153    pub fn shards(mut self, n: usize) -> Self {
154        self.shards = n.max(1);
155        self
156    }
157
158    /// Provide a [`SnapshotStore`]. When set together with
159    /// [`AggregateRoot::encode_state`] / [`AggregateRoot::decode_state`],
160    /// the gateway prefers snapshots on recovery and saves new ones
161    /// according to [`Self::snapshot_policy`].
162    pub fn snapshot_store<S: SnapshotStore + ?Sized>(mut self, store: Arc<S>) -> Self
163    where
164        Arc<S>: Into<Arc<dyn SnapshotStore>>,
165    {
166        self.snapshot_store = Some(store.into());
167        self
168    }
169
170    /// Override the snapshot cadence policy. Default: `Manual` (no
171    /// auto-snapshots; users must call `save_snapshot` themselves).
172    pub fn snapshot_policy(mut self, policy: SnapshotPolicy) -> Self {
173        self.snapshot_policy = policy;
174        self
175    }
176
177    /// Cap on retained snapshots per persistence id. Default: 1.
178    pub fn snapshot_keep_last(mut self, n: usize) -> Self {
179        self.snapshot_keep_last = n.max(1);
180        self
181    }
182
183    /// Set the user-guardian name for this pattern's root actor.
184    /// Default: `"cqrs"`.
185    pub fn name(mut self, name: impl Into<String>) -> Self {
186        self.name = Some(name.into());
187        self
188    }
189
190    /// Provide a factory that constructs a fresh aggregate for a given id.
191    /// The framework calls this lazily — once per id — and reuses the
192    /// instance for every subsequent command targeting that id.
193    pub fn factory<F>(mut self, factory: F) -> Self
194    where
195        F: Fn(<A as AggregateRoot>::Id) -> A + Send + Sync + 'static,
196    {
197        self.factory = Some(Arc::new(factory));
198        self
199    }
200
201    /// Provide the read-side journal that readers subscribe to. Required
202    /// only if you register any readers via [`Self::with_reader`].
203    pub fn read_journal<R: ReadJournal>(mut self, rj: Arc<R>) -> Self {
204        self.read_journal = Some(rj);
205        self
206    }
207
208    /// Cap on concurrently-recovering aggregates. Default: 8.
209    pub fn recovery_permits(mut self, n: usize) -> Self {
210        self.recovery_permits = n;
211        self
212    }
213
214    /// How often the reader runners poll the read journal. Default: 50ms.
215    pub fn poll_interval(mut self, d: Duration) -> Self {
216        self.poll_interval = d;
217        self
218    }
219
220    /// Timeout the [`Repository`] applies to each ask. Default: 5s.
221    pub fn repository_timeout(mut self, d: Duration) -> Self {
222        self.repo_timeout = d;
223        self
224    }
225
226    /// Override the writer UUID stamped onto every persisted event.
227    pub fn writer_uuid(mut self, w: impl Into<String>) -> Self {
228        self.writer_uuid = w.into();
229        self
230    }
231
232    /// Register a synchronous pre-handler interceptor (named slot
233    /// `on_command`). Returning `Err` short-circuits the persist with
234    /// [`PatternError::Intercepted`] (or any other variant the closure
235    /// constructs).
236    pub fn on_command<F>(mut self, hook: F) -> Self
237    where
238        F: Fn(&A::Command) -> Result<(), PatternError<A::Error>> + Send + Sync + 'static,
239    {
240        let hook: CommandInterceptor<A::Command, A::Error> = Arc::new(hook);
241        self.extensions.command_interceptors.push(hook);
242        self
243    }
244
245    /// Register a synchronous post-persist event listener (named slot
246    /// `on_event`). Listeners run in the gateway's actor task; keep
247    /// them fast — push to a tap if you need async work.
248    pub fn on_event<F>(mut self, hook: F) -> Self
249    where
250        F: Fn(&A::Event) + Send + Sync + 'static,
251    {
252        let hook: EventListener<A::Event> = Arc::new(hook);
253        self.extensions.event_listeners.push(hook);
254        self
255    }
256
257    /// Register an async event tap. The runner pushes a clone of every
258    /// successfully-persisted event into the channel. Closed receivers
259    /// are pruned silently.
260    pub fn tap_events(mut self, tx: tokio::sync::mpsc::UnboundedSender<A::Event>) -> Self {
261        self.extensions.event_taps.push(tx);
262        self
263    }
264
265    /// Register a [`Reader`] and receive a [`ProjectionHandle`] you
266    /// can use later to read the projection state. The reader's
267    /// `Event` type must equal the aggregate's `Event` type.
268    pub fn with_reader<R>(mut self, reader: R) -> (Self, ProjectionHandle<R::Projection>)
269    where
270        R: Reader<Event = A::Event>,
271    {
272        let name = reader.name().to_string();
273        let filter = reader.filter();
274        let state: Arc<RwLock<R::Projection>> = Arc::new(RwLock::new(R::Projection::default()));
275        let offset = Arc::new(AtomicU64::new(0));
276        let handle = ProjectionHandle { state: state.clone(), offset: offset.clone() };
277        let spec = ReaderSpec::<R> {
278            reader: Arc::new(Mutex::new(reader)),
279            state,
280            offset: offset.clone(),
281            name: name.clone(),
282            filter,
283        };
284        let ctx = spec.rebuild_context();
285        self.rebuild_contexts.push(ctx);
286        self.readers.push(Box::new(spec));
287        (self, handle)
288    }
289
290    /// Finalize the builder. Returns a [`CqrsTopology`] that you call
291    /// [`Topology::materialize`] on to spawn the actors and start the
292    /// readers.
293    pub fn build(self) -> Result<CqrsTopology<A, J>, PatternError<A::Error>> {
294        let factory = self.factory.ok_or(PatternError::NotConfigured("factory"))?;
295        if !self.readers.is_empty() && self.read_journal.is_none() && self.event_bus.is_none() {
296            return Err(PatternError::NotConfigured("read_journal"));
297        }
298        let snapshot = self.snapshot_store.map(|store| SnapshotConfig {
299            store,
300            policy: self.snapshot_policy,
301            keep_last: self.snapshot_keep_last,
302        });
303
304        // Plumb the bus into the gateway as a tap so persisted events
305        // flow to the bus automatically.
306        let mut extensions = self.extensions;
307        if let Some(bus) = self.event_bus.as_ref() {
308            let bus_for_listener = bus.clone();
309            let listener: crate::extensions::EventListener<A::Event> =
310                Arc::new(move |e: &A::Event| bus_for_listener.publish(e.clone()));
311            extensions.event_listeners.push(listener);
312        }
313
314        Ok(CqrsTopology {
315            name: self.name.unwrap_or_else(|| "cqrs".into()),
316            factory,
317            journal: self.journal,
318            read_journal: self.read_journal,
319            recovery_permits: self.recovery_permits,
320            writer_uuid: self.writer_uuid,
321            poll_interval: self.poll_interval,
322            repo_timeout: self.repo_timeout,
323            extensions,
324            readers: self.readers,
325            rebuild_contexts: self.rebuild_contexts,
326            snapshot,
327            shards: self.shards,
328            event_codecs: self.event_codecs,
329            reader_retry: self.reader_retry,
330            event_bus: self.event_bus,
331            dedupe_window: self.dedupe_window,
332        })
333    }
334}
335
336/// Inspectable description of a CQRS topology — actors not yet spawned,
337/// readers not yet running. Call [`Topology::materialize`] to bring it
338/// to life.
339pub struct CqrsTopology<A, J>
340where
341    A: AggregateRoot,
342    A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
343    A::Event: DomainEvent,
344    J: Journal,
345{
346    name: String,
347    factory: Arc<dyn Fn(<A as AggregateRoot>::Id) -> A + Send + Sync>,
348    journal: Arc<J>,
349    read_journal: Option<Arc<dyn ReadJournal>>,
350    recovery_permits: usize,
351    writer_uuid: String,
352    poll_interval: Duration,
353    repo_timeout: Duration,
354    extensions: ExtensionSlots<A::Command, A::Event, A::Error>,
355    readers: Vec<Box<dyn ErasedReader<A::Event>>>,
356    rebuild_contexts: Vec<RebuildContext<A::Event>>,
357    snapshot: Option<SnapshotConfig>,
358    shards: usize,
359    event_codecs: Option<Arc<EventCodecRegistry<A::Event>>>,
360    reader_retry: Option<(u32, RetrySchedule)>,
361    event_bus: Option<BusHandles<A::Event>>,
362    dedupe_window: usize,
363}
364
365#[async_trait]
366impl<A, J> Topology for CqrsTopology<A, J>
367where
368    A: AggregateRoot,
369    A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
370    A::Event: DomainEvent,
371    J: Journal,
372{
373    type Handles = CqrsHandles<A>;
374
375    async fn materialize(self, system: &ActorSystem) -> Result<Self::Handles, PatternError<()>> {
376        // Capture-by-move into the Props factory. The actor restart
377        // path will produce a fresh gateway with empty entity cache —
378        // recovery refills state from the journal on demand.
379        let factory = self.factory.clone();
380        let journal = self.journal.clone();
381        let permits = Arc::new(RecoveryPermitter::new(self.recovery_permits));
382        let writer_uuid = self.writer_uuid.clone();
383        let extensions = self.extensions.clone();
384        let snapshot = self.snapshot.clone();
385        let shards = self.shards.max(1);
386        let dedupe_window = self.dedupe_window;
387
388        let mut gateways: Vec<ActorRef<CommandEnvelope<A>>> = Vec::with_capacity(shards);
389        for shard_idx in 0..shards {
390            let factory = factory.clone();
391            let journal = journal.clone();
392            let permits = permits.clone();
393            let writer_uuid = writer_uuid.clone();
394            let extensions = extensions.clone();
395            let snapshot = snapshot.clone();
396            let actor_name =
397                if shards == 1 { self.name.clone() } else { format!("{}-shard-{shard_idx}", self.name) };
398            let actor_ref = system
399                .actor_of(
400                    Props::create(move || CommandGateway::<A, J> {
401                        factory: factory.clone(),
402                        journal: journal.clone(),
403                        permits: permits.clone(),
404                        writer_uuid: writer_uuid.clone(),
405                        entities: HashMap::new(),
406                        extensions: extensions.clone(),
407                        snapshot: snapshot.clone(),
408                        dedupe_window,
409                    }),
410                    &actor_name,
411                )
412                .map_err(|e| PatternError::Invariant(format!("spawn gateway: {e}")))?;
413            gateways.push(actor_ref);
414        }
415
416        // Capture the read_journal early so we can both spawn readers
417        // and build rebuild closures.
418        let read_journal = self.read_journal.clone();
419        // Spawn reader runners — one tokio task per reader. When an
420        // event bus is wired, readers run in live-tail mode; otherwise
421        // they poll the read journal.
422        let bus = self.event_bus.clone();
423        let codecs = self.event_codecs.clone();
424        let retry_cfg = self.reader_retry;
425        if !self.readers.is_empty() {
426            let interval = self.poll_interval;
427            if let Some(bus_handles) = bus {
428                for spec in self.readers {
429                    let rx = bus_handles.subscribe();
430                    tokio::spawn(run_reader_live(spec, rx, retry_cfg));
431                }
432            } else if let Some(rj) = read_journal.clone() {
433                for spec in self.readers {
434                    let codecs = codecs.clone();
435                    tokio::spawn(run_reader_poll(spec, rj.clone(), interval, codecs, retry_cfg));
436                }
437            } else {
438                // build() rejects this combination, but surface a real
439                // error rather than panicking if it ever slips through.
440                return Err(PatternError::Invariant(
441                    "readers configured without an event bus or a read journal".into(),
442                ));
443            }
444        }
445
446        let repo: Arc<dyn Repository<Aggregate = A>> =
447            Arc::new(ShardedRepository::<A> { gateways, timeout: self.repo_timeout });
448
449        // Build rebuild closures for each registered reader. Rebuild
450        // requires a read_journal; live-tail-only readers (no journal
451        // configured) get a closure that returns an explanatory error.
452        let mut rebuilds: HashMap<String, RebuildFn> = HashMap::new();
453        let rebuild_journal = read_journal.clone();
454        let rebuild_codecs = self.event_codecs.clone();
455        for ctx in self.rebuild_contexts {
456            let journal = rebuild_journal.clone();
457            let codecs = rebuild_codecs.clone();
458            let name = ctx.name.clone();
459            let f: RebuildFn = Arc::new(move || {
460                let ctx = RebuildContext {
461                    name: ctx.name.clone(),
462                    state_reset: ctx.state_reset.clone(),
463                    apply: ctx.apply.clone(),
464                    filter: ctx.filter.clone(),
465                    offset: ctx.offset.clone(),
466                };
467                let journal = journal.clone();
468                let codecs = codecs.clone();
469                Box::pin(async move {
470                    let Some(rj) = journal else {
471                        return Err("rebuild_projection requires a read_journal".into());
472                    };
473                    rebuild_one_projection(ctx, rj, codecs).await
474                })
475            });
476            rebuilds.insert(name, f);
477        }
478
479        Ok(CqrsHandles { repository: repo, rebuilds })
480    }
481}
482
483async fn rebuild_one_projection<E: Send + Clone + 'static>(
484    ctx: RebuildContext<E>,
485    rj: Arc<dyn ReadJournal>,
486    codecs: Option<Arc<EventCodecRegistry<E>>>,
487) -> Result<(), String> {
488    (ctx.state_reset)().await;
489    let pids = match &ctx.filter {
490        ReaderFilter::All | ReaderFilter::Tag(_) => {
491            rj.all_persistence_ids().await.map_err(|e| format!("list pids: {e:?}"))?
492        }
493        ReaderFilter::PersistenceId(id) => vec![id.clone()],
494        ReaderFilter::PersistenceIds(ids) => ids.clone(),
495    };
496    let mut max_seq: u64 = 0;
497    for pid in pids {
498        let events = rj
499            .events_by_persistence_id(&pid, 1, u64::MAX)
500            .await
501            .map_err(|e| format!("read pid {pid}: {e:?}"))?;
502        for env in events {
503            if let ReaderFilter::Tag(t) = &ctx.filter {
504                if !env.tags.iter().any(|x| x == t) {
505                    continue;
506                }
507            }
508            let decoded =
509                codecs.as_ref().and_then(|r| r.decode(&env.manifest, &env.payload)).ok_or_else(|| {
510                    format!("no decoder for manifest `{}` (configure EventCodecRegistry)", env.manifest)
511                })?;
512            let event = decoded?;
513            (ctx.apply)(event).await.map_err(|e| format!("apply during rebuild: {e}"))?;
514            if env.sequence_nr > max_seq {
515                max_seq = env.sequence_nr;
516            }
517        }
518    }
519    ctx.offset.store(max_seq, Ordering::Release);
520    Ok(())
521}
522
523/// Strongly-typed handles into a materialized CQRS instance.
524pub struct CqrsHandles<A>
525where
526    A: AggregateRoot,
527    A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
528    A::Event: DomainEvent,
529{
530    repository: Arc<dyn Repository<Aggregate = A>>,
531    rebuilds: HashMap<String, RebuildFn>,
532}
533
534type RebuildFn = Arc<dyn Fn() -> BoxFuture<'static, Result<(), String>> + Send + Sync + 'static>;
535
536impl<A> CqrsHandles<A>
537where
538    A: AggregateRoot,
539    A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
540    A::Event: DomainEvent,
541{
542    /// Type-erased repository handle.
543    pub fn repository(&self) -> Arc<dyn Repository<Aggregate = A>> {
544        self.repository.clone()
545    }
546
547    /// Reset and replay the projection associated with the named
548    /// reader. Returns `Err` if no reader by that name was registered
549    /// at build time, or if no [`atomr_persistence_query::ReadJournal`]
550    /// is configured (live-tail-only readers can't be rebuilt — they
551    /// have no journal to scan).
552    pub async fn rebuild_projection(&self, name: &str) -> Result<(), String> {
553        let f = self.rebuilds.get(name).ok_or_else(|| format!("no reader named `{name}`"))?.clone();
554        f().await
555    }
556}
557
558// ─── Implementation details below ──────────────────────────────────────
559
560struct ShardedRepository<A>
561where
562    A: AggregateRoot,
563    A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
564    A::Event: DomainEvent,
565{
566    gateways: Vec<ActorRef<CommandEnvelope<A>>>,
567    timeout: Duration,
568}
569
570#[async_trait]
571impl<A> Repository for ShardedRepository<A>
572where
573    A: AggregateRoot,
574    A::Command: Command<AggregateId = <A as AggregateRoot>::Id>,
575    A::Event: DomainEvent,
576{
577    type Aggregate = A;
578
579    async fn send(&self, cmd: A::Command) -> Result<Vec<A::Event>, PatternError<A::Error>> {
580        let id = cmd.aggregate_id();
581        let idx = shard_index(&id, self.gateways.len());
582        match self.gateways[idx].ask_with(|reply| CommandEnvelope { cmd, reply }, self.timeout).await {
583            Ok(inner) => inner,
584            Err(ask) => Err(PatternError::Ask(ask)),
585        }
586    }
587}
588
589fn shard_index<I: std::hash::Hash>(id: &I, n: usize) -> usize {
590    use std::hash::Hasher;
591    let mut h = std::collections::hash_map::DefaultHasher::new();
592    id.hash(&mut h);
593    (h.finish() as usize) % n.max(1)
594}
595
596trait ErasedReader<E>: Send + Sync + 'static {
597    fn name(&self) -> String;
598    fn filter(&self) -> ReaderFilter;
599    fn offset(&self) -> Arc<AtomicU64>;
600    fn decode_payload(&self, bytes: &[u8]) -> Result<E, String>;
601    fn apply<'a>(&'a self, event: E) -> BoxFuture<'a, Result<(), String>>;
602}
603
604struct ReaderSpec<R: Reader> {
605    reader: Arc<Mutex<R>>,
606    state: Arc<RwLock<R::Projection>>,
607    offset: Arc<AtomicU64>,
608    name: String,
609    filter: ReaderFilter,
610}
611
612impl<R: Reader> ErasedReader<R::Event> for ReaderSpec<R> {
613    fn name(&self) -> String {
614        self.name.clone()
615    }
616    fn filter(&self) -> ReaderFilter {
617        self.filter.clone()
618    }
619    fn offset(&self) -> Arc<AtomicU64> {
620        self.offset.clone()
621    }
622    fn decode_payload(&self, bytes: &[u8]) -> Result<R::Event, String> {
623        R::decode(bytes)
624    }
625    fn apply<'a>(&'a self, event: R::Event) -> BoxFuture<'a, Result<(), String>> {
626        let state = self.state.clone();
627        let reader = self.reader.clone();
628        Box::pin(async move {
629            let mut state = state.write().await;
630            let mut reader = reader.lock().await;
631            reader.apply(&mut *state, event).await.map_err(|e| e.to_string())
632        })
633    }
634}
635
636type ResetFn = Arc<dyn Fn() -> BoxFuture<'static, ()> + Send + Sync>;
637type ApplyFn<E> = Arc<dyn Fn(E) -> BoxFuture<'static, Result<(), String>> + Send + Sync>;
638
639/// Standalone reference to a reader's apply path used by rebuild
640/// closures (which need access without going through the
641/// `Box<dyn ErasedReader>` that lives inside the runner).
642struct RebuildContext<E: Send + Clone + 'static> {
643    name: String,
644    state_reset: ResetFn,
645    apply: ApplyFn<E>,
646    filter: ReaderFilter,
647    offset: Arc<AtomicU64>,
648}
649
650impl<R: Reader> ReaderSpec<R> {
651    fn rebuild_context(&self) -> RebuildContext<R::Event> {
652        let state = self.state.clone();
653        let offset = self.offset.clone();
654        let reader = self.reader.clone();
655        let state_clone = state.clone();
656        let offset_clone = offset.clone();
657        let reader_clone = reader.clone();
658        let state_reset: ResetFn = Arc::new(move || {
659            let state = state_clone.clone();
660            let offset = offset_clone.clone();
661            Box::pin(async move {
662                *state.write().await = R::Projection::default();
663                offset.store(0, Ordering::Release);
664            })
665        });
666        let apply: ApplyFn<R::Event> = Arc::new(move |event: R::Event| {
667            let state = state.clone();
668            let reader = reader_clone.clone();
669            Box::pin(async move {
670                let mut state = state.write().await;
671                let mut reader = reader.lock().await;
672                reader.apply(&mut *state, event).await.map_err(|e| e.to_string())
673            })
674        });
675        RebuildContext { name: self.name.clone(), state_reset, apply, filter: self.filter.clone(), offset }
676    }
677}
678
679async fn run_reader_poll<E: Send + Clone + 'static>(
680    reader: Box<dyn ErasedReader<E>>,
681    read_journal: Arc<dyn ReadJournal>,
682    poll_interval: Duration,
683    codecs: Option<Arc<EventCodecRegistry<E>>>,
684    retry: Option<(u32, RetrySchedule)>,
685) {
686    let mut pid_offsets: HashMap<String, u64> = HashMap::new();
687    let offset_handle = reader.offset();
688    let filter = reader.filter();
689    let name = reader.name();
690
691    loop {
692        let pids = match resolve_pids(&filter, &*read_journal).await {
693            Ok(p) => p,
694            Err(e) => {
695                tracing::warn!(reader = %name, error = ?e, "list pids failed");
696                tokio::time::sleep(poll_interval).await;
697                continue;
698            }
699        };
700
701        let mut max_seq_seen = offset_handle.load(Ordering::Acquire);
702
703        for pid in pids {
704            let from = pid_offsets.get(&pid).copied().unwrap_or(0).saturating_add(1);
705            let events = match read_journal.events_by_persistence_id(&pid, from, u64::MAX).await {
706                Ok(e) => e,
707                Err(e) => {
708                    tracing::warn!(reader = %name, pid = %pid, error = ?e, "read failed");
709                    continue;
710                }
711            };
712
713            for env in events {
714                if let ReaderFilter::Tag(t) = &filter {
715                    if !env.tags.iter().any(|x| x == t) {
716                        pid_offsets.insert(pid.clone(), env.sequence_nr);
717                        continue;
718                    }
719                }
720
721                let decoded = codecs
722                    .as_ref()
723                    .and_then(|r| r.decode(&env.manifest, &env.payload))
724                    .unwrap_or_else(|| reader.decode_payload(&env.payload));
725
726                match decoded {
727                    Ok(event) => {
728                        apply_with_retry(&*reader, event, retry, &name).await;
729                        pid_offsets.insert(pid.clone(), env.sequence_nr);
730                        if env.sequence_nr > max_seq_seen {
731                            max_seq_seen = env.sequence_nr;
732                        }
733                    }
734                    Err(s) => {
735                        tracing::warn!(reader = %name, error = %s, "decode failed");
736                        pid_offsets.insert(pid.clone(), env.sequence_nr);
737                    }
738                }
739            }
740        }
741
742        offset_handle.store(max_seq_seen, Ordering::Release);
743        tokio::time::sleep(poll_interval).await;
744    }
745}
746
747async fn run_reader_live<E: Send + Clone + 'static>(
748    reader: Box<dyn ErasedReader<E>>,
749    mut rx: tokio::sync::mpsc::UnboundedReceiver<E>,
750    retry: Option<(u32, RetrySchedule)>,
751) {
752    let name = reader.name();
753    while let Some(event) = rx.recv().await {
754        apply_with_retry(&*reader, event, retry, &name).await;
755    }
756}
757
758async fn apply_with_retry<E: Send + Clone + 'static>(
759    reader: &dyn ErasedReader<E>,
760    event: E,
761    retry: Option<(u32, RetrySchedule)>,
762    name: &str,
763) {
764    let result = if let Some((max_attempts, sched)) = retry {
765        let mut last: Option<String> = None;
766        for attempt in 0..max_attempts {
767            match reader.apply(event.clone()).await {
768                Ok(()) => return,
769                Err(e) => {
770                    last = Some(e);
771                    if attempt + 1 < max_attempts {
772                        tokio::time::sleep(sched.delay_for(attempt)).await;
773                    }
774                }
775            }
776        }
777        Err(last.unwrap_or_else(|| "unknown".into()))
778    } else {
779        reader.apply(event).await
780    };
781    if let Err(err) = result {
782        tracing::warn!(reader = %name, error = %err, "apply failed (retries exhausted)");
783    }
784}
785
786async fn resolve_pids(
787    filter: &ReaderFilter,
788    rj: &dyn ReadJournal,
789) -> Result<Vec<String>, atomr_persistence::JournalError> {
790    match filter {
791        ReaderFilter::All | ReaderFilter::Tag(_) => rj.all_persistence_ids().await,
792        ReaderFilter::PersistenceId(id) => Ok(vec![id.clone()]),
793        ReaderFilter::PersistenceIds(ids) => Ok(ids.clone()),
794    }
795}
796
797fn rand_writer_id() -> String {
798    use std::time::{SystemTime, UNIX_EPOCH};
799    let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0);
800    format!("{nanos:x}")
801}