orlando_cluster/replication_consumer.rs
1//! Background task that polls the `ReplicationLog` for new entries and
2//! applies them to a local replica store. Secondary clusters run one
3//! consumer per replicated grain to maintain read-only copies.
4//!
5//! # Snapshot semantics (IMPORTANT)
6//!
7//! The consumer reads up to 100 entries per poll and **only applies the
8//! latest one** — intermediates in the batch are dropped. This is safe
9//! ONLY because today every `ReplicationEntry` carries a complete
10//! `ReplicationEntryType::FullState` snapshot. Each snapshot replaces the
11//! replica state in full, so older snapshots in the same batch are
12//! redundant.
13//!
14//! `ReplicationEntryType::Delta` is reserved for a future event-sourced
15//! replication mode; it is **not** supported by this consumer. If a
16//! `Delta` entry is encountered the consumer logs an error and skips it,
17//! because applying only the latest delta would corrupt replica state.
18//! Full delta replay (fold over intermediates) is a separate feature.
19
20use std::sync::Arc;
21use std::time::Duration;
22
23use tokio::sync::watch;
24
25use orlando_core::ClusterId;
26use orlando_persistence::ReplicationLog;
27
28use crate::store_wrapper::ReplicaStore;
29
30/// Tracks replication state for a single grain on a secondary cluster.
31pub struct ReplicationConsumer {
32 grain_type: String,
33 grain_key: String,
34 #[allow(dead_code)]
35 local_cluster: ClusterId,
36 log: Arc<dyn ReplicationLog>,
37 replica_store: Arc<ReplicaStore>,
38 poll_interval: Duration,
39 shutdown_rx: watch::Receiver<bool>,
40}
41
42impl ReplicationConsumer {
43 pub fn new(
44 grain_type: String,
45 grain_key: String,
46 local_cluster: ClusterId,
47 log: Arc<dyn ReplicationLog>,
48 replica_store: Arc<ReplicaStore>,
49 poll_interval: Duration,
50 shutdown_rx: watch::Receiver<bool>,
51 ) -> Self {
52 Self {
53 grain_type,
54 grain_key,
55 local_cluster,
56 log,
57 replica_store,
58 poll_interval,
59 shutdown_rx,
60 }
61 }
62
63 /// Run the consumer loop. Call via `tokio::spawn`.
64 pub async fn run(mut self) {
65 let mut last_sequence = 0u64;
66
67 loop {
68 tokio::select! {
69 _ = tokio::time::sleep(self.poll_interval) => {}
70 _ = self.shutdown_rx.changed() => {
71 tracing::debug!(
72 grain_type = %self.grain_type,
73 grain_key = %self.grain_key,
74 "replication consumer shutting down"
75 );
76 return;
77 }
78 }
79
80 match self
81 .log
82 .read_from(&self.grain_type, &self.grain_key, last_sequence, 100)
83 .await
84 {
85 Ok(entries) if entries.is_empty() => {
86 // No new entries, continue polling
87 }
88 Ok(entries) => {
89 use orlando_core::replication::ReplicationEntryType;
90
91 // Skip-to-latest is only sound for FullState snapshots — see module docs.
92 // If any entry in the batch is a Delta, log an error and refuse the batch.
93 if let Some(delta) = entries
94 .iter()
95 .find(|e| e.entry_type == ReplicationEntryType::Delta)
96 {
97 tracing::error!(
98 grain_type = %delta.grain_type,
99 grain_key = %delta.grain_key,
100 sequence = delta.sequence,
101 "replication consumer encountered Delta entry but only \
102 FullState is supported; skipping batch to avoid corruption"
103 );
104 // Advance past the offending sequence so we don't loop forever.
105 last_sequence = entries
106 .last()
107 .map(|e| e.sequence)
108 .unwrap_or(last_sequence);
109 continue;
110 }
111
112 for entry in &entries {
113 tracing::debug!(
114 grain_type = %entry.grain_type,
115 grain_key = %entry.grain_key,
116 sequence = entry.sequence,
117 source = %entry.source_cluster,
118 "replication consumer: applying entry"
119 );
120 }
121
122 // Apply the latest full-state snapshot (intermediates are
123 // redundant under FullState semantics).
124 if let Some(latest) = entries.last() {
125 self.replica_store.update(
126 &self.grain_type,
127 &self.grain_key,
128 latest.payload.clone(),
129 latest.sequence,
130 latest.timestamp_millis,
131 );
132 last_sequence = latest.sequence;
133 }
134 }
135 Err(e) => {
136 tracing::warn!(
137 grain_type = %self.grain_type,
138 grain_key = %self.grain_key,
139 error = %e,
140 "replication consumer: failed to read from log"
141 );
142 }
143 }
144 }
145 }
146}