Skip to main content

orlando_cluster/
replication_consumer.rs

1//! Background task that polls the `ReplicationLog` for new entries and
2//! applies them to a local replica store. Secondary clusters run one
3//! consumer per replicated grain to maintain read-only copies.
4//!
5//! # Snapshot semantics (IMPORTANT)
6//!
7//! The consumer reads up to 100 entries per poll and **only applies the
8//! latest one** — intermediates in the batch are dropped. This is safe
9//! ONLY because today every `ReplicationEntry` carries a complete
10//! `ReplicationEntryType::FullState` snapshot. Each snapshot replaces the
11//! replica state in full, so older snapshots in the same batch are
12//! redundant.
13//!
14//! `ReplicationEntryType::Delta` is reserved for a future event-sourced
15//! replication mode; it is **not** supported by this consumer. If a
16//! `Delta` entry is encountered the consumer logs an error and skips it,
17//! because applying only the latest delta would corrupt replica state.
18//! Full delta replay (fold over intermediates) is a separate feature.
19
20use std::sync::Arc;
21use std::time::Duration;
22
23use tokio::sync::watch;
24
25use orlando_core::ClusterId;
26use orlando_persistence::ReplicationLog;
27
28use crate::store_wrapper::ReplicaStore;
29
30/// Tracks replication state for a single grain on a secondary cluster.
31pub struct ReplicationConsumer {
32    grain_type: String,
33    grain_key: String,
34    #[allow(dead_code)]
35    local_cluster: ClusterId,
36    log: Arc<dyn ReplicationLog>,
37    replica_store: Arc<ReplicaStore>,
38    poll_interval: Duration,
39    shutdown_rx: watch::Receiver<bool>,
40}
41
42impl ReplicationConsumer {
43    pub fn new(
44        grain_type: String,
45        grain_key: String,
46        local_cluster: ClusterId,
47        log: Arc<dyn ReplicationLog>,
48        replica_store: Arc<ReplicaStore>,
49        poll_interval: Duration,
50        shutdown_rx: watch::Receiver<bool>,
51    ) -> Self {
52        Self {
53            grain_type,
54            grain_key,
55            local_cluster,
56            log,
57            replica_store,
58            poll_interval,
59            shutdown_rx,
60        }
61    }
62
63    /// Run the consumer loop. Call via `tokio::spawn`.
64    pub async fn run(mut self) {
65        let mut last_sequence = 0u64;
66
67        loop {
68            tokio::select! {
69                _ = tokio::time::sleep(self.poll_interval) => {}
70                _ = self.shutdown_rx.changed() => {
71                    tracing::debug!(
72                        grain_type = %self.grain_type,
73                        grain_key = %self.grain_key,
74                        "replication consumer shutting down"
75                    );
76                    return;
77                }
78            }
79
80            match self
81                .log
82                .read_from(&self.grain_type, &self.grain_key, last_sequence, 100)
83                .await
84            {
85                Ok(entries) if entries.is_empty() => {
86                    // No new entries, continue polling
87                }
88                Ok(entries) => {
89                    use orlando_core::replication::ReplicationEntryType;
90
91                    // Skip-to-latest is only sound for FullState snapshots — see module docs.
92                    // If any entry in the batch is a Delta, log an error and refuse the batch.
93                    if let Some(delta) = entries
94                        .iter()
95                        .find(|e| e.entry_type == ReplicationEntryType::Delta)
96                    {
97                        tracing::error!(
98                            grain_type = %delta.grain_type,
99                            grain_key = %delta.grain_key,
100                            sequence = delta.sequence,
101                            "replication consumer encountered Delta entry but only \
102                             FullState is supported; skipping batch to avoid corruption"
103                        );
104                        // Advance past the offending sequence so we don't loop forever.
105                        last_sequence = entries
106                            .last()
107                            .map(|e| e.sequence)
108                            .unwrap_or(last_sequence);
109                        continue;
110                    }
111
112                    for entry in &entries {
113                        tracing::debug!(
114                            grain_type = %entry.grain_type,
115                            grain_key = %entry.grain_key,
116                            sequence = entry.sequence,
117                            source = %entry.source_cluster,
118                            "replication consumer: applying entry"
119                        );
120                    }
121
122                    // Apply the latest full-state snapshot (intermediates are
123                    // redundant under FullState semantics).
124                    if let Some(latest) = entries.last() {
125                        self.replica_store.update(
126                            &self.grain_type,
127                            &self.grain_key,
128                            latest.payload.clone(),
129                            latest.sequence,
130                            latest.timestamp_millis,
131                        );
132                        last_sequence = latest.sequence;
133                    }
134                }
135                Err(e) => {
136                    tracing::warn!(
137                        grain_type = %self.grain_type,
138                        grain_key = %self.grain_key,
139                        error = %e,
140                        "replication consumer: failed to read from log"
141                    );
142                }
143            }
144        }
145    }
146}