Skip to main content

kuberic_core/replicator/
primary.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::mpsc;
5use tokio_stream::StreamExt;
6use tokio_stream::wrappers::ReceiverStream;
7use tonic::transport::Channel;
8use tracing::{debug, warn};
9
10use crate::proto::ReplicationItem;
11use crate::proto::replicator_data_client::ReplicatorDataClient;
12use crate::replicator::quorum::QuorumTracker;
13use crate::types::{Epoch, Lsn, ReplicaId};
14
15/// A connection to one secondary replica. Uses an unbounded channel so
16/// send_to_all never blocks the actor. A background drain task bridges
17/// the unbounded channel to the bounded gRPC stream.
18struct SecondaryConnection {
19    /// Unbounded sender — send_to_all writes here, never blocks
20    item_tx: mpsc::UnboundedSender<ReplicationItem>,
21}
22
23/// Primary-side replication sender. Manages connections to all configured
24/// secondaries, sends operations, and routes ACKs back to the QuorumTracker.
25///
26/// **Non-blocking design (matching SF):** `send_to_all` enqueues ops into
27/// per-secondary unbounded channels and returns immediately. Each secondary
28/// has a background drain task that reads from the unbounded channel and
29/// writes to the gRPC stream. A slow secondary's drain task blocks
30/// independently without affecting the actor or other secondaries.
31///
32/// Pending ops for new replicas are replayed from the ReplicationQueue
33/// at `add_secondary` time — no build buffers needed.
34pub struct PrimarySender {
35    connections: HashMap<ReplicaId, SecondaryConnection>,
36    #[allow(dead_code)]
37    primary_id: ReplicaId,
38    epoch: Epoch,
39}
40
41impl PrimarySender {
42    pub fn new(primary_id: ReplicaId, epoch: Epoch) -> Self {
43        Self {
44            connections: HashMap::new(),
45            primary_id,
46            epoch,
47        }
48    }
49
50    pub fn set_epoch(&mut self, epoch: Epoch) {
51        self.epoch = epoch;
52    }
53
54    /// Connect to a secondary's replication gRPC endpoint.
55    /// If ops were buffered during build, they are replayed first.
56    ///
57    /// Spawns two background tasks per secondary:
58    /// 1. **Drain task**: reads from unbounded channel, writes to bounded
59    ///    gRPC stream. May block on slow secondary — only blocks this task.
60    /// 2. **ACK reader**: reads ACKs from gRPC response stream, routes to
61    ///    QuorumTracker.
62    pub async fn add_secondary(
63        &mut self,
64        replica_id: ReplicaId,
65        address: String,
66        quorum_tracker: Arc<tokio::sync::Mutex<QuorumTracker>>,
67        partition_state: Arc<crate::handles::PartitionState>,
68    ) -> crate::Result<()> {
69        if self.connections.contains_key(&replica_id) {
70            return Ok(()); // already connected
71        }
72
73        let channel = Channel::from_shared(address)
74            .map_err(|e| crate::KubericError::Internal(Box::new(e)))?
75            .connect()
76            .await
77            .map_err(|e| crate::KubericError::Internal(Box::new(e)))?;
78
79        let mut client = ReplicatorDataClient::new(channel);
80
81        // Bounded channel for the gRPC stream (backpressure at transport level)
82        let (grpc_tx, grpc_rx) = mpsc::channel::<ReplicationItem>(256);
83        let outbound = ReceiverStream::new(grpc_rx);
84
85        let response = client
86            .replication_stream(outbound)
87            .await
88            .map_err(|e| crate::KubericError::Internal(Box::new(e)))?;
89
90        let mut ack_stream = response.into_inner();
91        let rid = replica_id;
92
93        // Spawn ACK reader
94        let ps = partition_state;
95        tokio::spawn(async move {
96            while let Some(result) = ack_stream.next().await {
97                match result {
98                    Ok(ack) => {
99                        debug!(replica_id = rid, lsn = ack.lsn, "received ACK");
100                        let mut tracker = quorum_tracker.lock().await;
101                        tracker.ack(ack.lsn, rid);
102                        // Update PartitionState so committed_lsn is always fresh,
103                        // even without a new data_rx item to trigger the read.
104                        // This fixes the off-by-one where the last op's committed_lsn
105                        // was never propagated because no subsequent op triggered
106                        // a PartitionState refresh.
107                        ps.set_committed_lsn(tracker.committed_lsn());
108                    }
109                    Err(e) => {
110                        warn!(replica_id = rid, error = %e, "ACK stream error");
111                        break;
112                    }
113                }
114            }
115        });
116
117        // Unbounded channel: send_to_all writes here (never blocks)
118        let (unbounded_tx, mut unbounded_rx) = mpsc::unbounded_channel::<ReplicationItem>();
119
120        // Spawn drain task: bridges unbounded → bounded gRPC stream.
121        // This task may block on grpc_tx.send() if the secondary is slow,
122        // but that only blocks THIS task, not the actor.
123        tokio::spawn(async move {
124            while let Some(item) = unbounded_rx.recv().await {
125                if grpc_tx.send(item).await.is_err() {
126                    warn!(replica_id = rid, "gRPC stream closed, drain task exiting");
127                    break;
128                }
129            }
130        });
131
132        self.connections.insert(
133            replica_id,
134            SecondaryConnection {
135                item_tx: unbounded_tx,
136            },
137        );
138
139        Ok(())
140    }
141
142    /// Send a single item to a specific secondary (for replay from queue).
143    pub fn send_to_one(
144        &self,
145        replica_id: ReplicaId,
146        lsn: Lsn,
147        data: &bytes::Bytes,
148        committed_lsn: Lsn,
149    ) {
150        let item = ReplicationItem {
151            epoch_data_loss: self.epoch.data_loss_number,
152            epoch_config: self.epoch.configuration_number,
153            lsn,
154            data: data.to_vec(),
155            committed_lsn,
156        };
157        if let Some(conn) = self.connections.get(&replica_id)
158            && conn.item_tx.send(item).is_err()
159        {
160            warn!(replica_id, lsn, "send_to_one: channel closed");
161        }
162    }
163
164    /// Remove a secondary connection.
165    pub fn remove_secondary(&mut self, replica_id: ReplicaId) {
166        self.connections.remove(&replica_id);
167    }
168
169    /// Send an operation to all connected secondaries. Non-blocking —
170    /// uses unbounded channels. Matches SF's fire-and-forget dispatch.
171    pub fn send_to_all(&mut self, lsn: Lsn, data: &bytes::Bytes, committed_lsn: Lsn) {
172        let item = ReplicationItem {
173            epoch_data_loss: self.epoch.data_loss_number,
174            epoch_config: self.epoch.configuration_number,
175            lsn,
176            data: data.to_vec(),
177            committed_lsn,
178        };
179
180        let mut dead = Vec::new();
181        for (&rid, conn) in &self.connections {
182            if conn.item_tx.send(item.clone()).is_err() {
183                warn!(
184                    replica_id = rid,
185                    lsn, "secondary channel closed — removing connection"
186                );
187                dead.push(rid);
188            }
189        }
190        for rid in dead {
191            self.connections.remove(&rid);
192        }
193    }
194
195    /// Number of connected secondaries.
196    pub fn connection_count(&self) -> usize {
197        self.connections.len()
198    }
199
200    /// Check if a secondary is connected.
201    pub fn has_connection(&self, replica_id: &ReplicaId) -> bool {
202        self.connections.contains_key(replica_id)
203    }
204
205    /// Get all connected replica IDs.
206    pub fn connected_ids(&self) -> Vec<ReplicaId> {
207        self.connections.keys().cloned().collect()
208    }
209
210    /// Close all connections.
211    pub fn close_all(&mut self) {
212        self.connections.clear();
213    }
214}