use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use orlando_core::ClusterId;
use orlando_persistence::ReplicationLog;
use crate::store_wrapper::ReplicaStore;
pub struct ReplicationConsumer {
grain_type: String,
grain_key: String,
#[allow(dead_code)]
local_cluster: ClusterId,
log: Arc<dyn ReplicationLog>,
replica_store: Arc<ReplicaStore>,
poll_interval: Duration,
shutdown_rx: watch::Receiver<bool>,
}
impl ReplicationConsumer {
pub fn new(
grain_type: String,
grain_key: String,
local_cluster: ClusterId,
log: Arc<dyn ReplicationLog>,
replica_store: Arc<ReplicaStore>,
poll_interval: Duration,
shutdown_rx: watch::Receiver<bool>,
) -> Self {
Self {
grain_type,
grain_key,
local_cluster,
log,
replica_store,
poll_interval,
shutdown_rx,
}
}
pub async fn run(mut self) {
let mut last_sequence = 0u64;
loop {
tokio::select! {
_ = tokio::time::sleep(self.poll_interval) => {}
_ = self.shutdown_rx.changed() => {
tracing::debug!(
grain_type = %self.grain_type,
grain_key = %self.grain_key,
"replication consumer shutting down"
);
return;
}
}
match self
.log
.read_from(&self.grain_type, &self.grain_key, last_sequence, 100)
.await
{
Ok(entries) if entries.is_empty() => {
}
Ok(entries) => {
use orlando_core::replication::ReplicationEntryType;
if let Some(delta) = entries
.iter()
.find(|e| e.entry_type == ReplicationEntryType::Delta)
{
tracing::error!(
grain_type = %delta.grain_type,
grain_key = %delta.grain_key,
sequence = delta.sequence,
"replication consumer encountered Delta entry but only \
FullState is supported; skipping batch to avoid corruption"
);
last_sequence = entries
.last()
.map(|e| e.sequence)
.unwrap_or(last_sequence);
continue;
}
for entry in &entries {
tracing::debug!(
grain_type = %entry.grain_type,
grain_key = %entry.grain_key,
sequence = entry.sequence,
source = %entry.source_cluster,
"replication consumer: applying entry"
);
}
if let Some(latest) = entries.last() {
self.replica_store.update(
&self.grain_type,
&self.grain_key,
latest.payload.clone(),
latest.sequence,
latest.timestamp_millis,
);
last_sequence = latest.sequence;
}
}
Err(e) => {
tracing::warn!(
grain_type = %self.grain_type,
grain_key = %self.grain_key,
error = %e,
"replication consumer: failed to read from log"
);
}
}
}
}
}