kuberic_core/replicator/
primary.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::mpsc;
5use tokio_stream::StreamExt;
6use tokio_stream::wrappers::ReceiverStream;
7use tonic::transport::Channel;
8use tracing::{debug, warn};
9
10use crate::proto::ReplicationItem;
11use crate::proto::replicator_data_client::ReplicatorDataClient;
12use crate::replicator::quorum::QuorumTracker;
13use crate::types::{Epoch, Lsn, ReplicaId};
14
15struct SecondaryConnection {
19 item_tx: mpsc::UnboundedSender<ReplicationItem>,
21}
22
23pub struct PrimarySender {
35 connections: HashMap<ReplicaId, SecondaryConnection>,
36 #[allow(dead_code)]
37 primary_id: ReplicaId,
38 epoch: Epoch,
39}
40
41impl PrimarySender {
42 pub fn new(primary_id: ReplicaId, epoch: Epoch) -> Self {
43 Self {
44 connections: HashMap::new(),
45 primary_id,
46 epoch,
47 }
48 }
49
50 pub fn set_epoch(&mut self, epoch: Epoch) {
51 self.epoch = epoch;
52 }
53
54 pub async fn add_secondary(
63 &mut self,
64 replica_id: ReplicaId,
65 address: String,
66 quorum_tracker: Arc<tokio::sync::Mutex<QuorumTracker>>,
67 partition_state: Arc<crate::handles::PartitionState>,
68 ) -> crate::Result<()> {
69 if self.connections.contains_key(&replica_id) {
70 return Ok(()); }
72
73 let channel = Channel::from_shared(address)
74 .map_err(|e| crate::KubericError::Internal(Box::new(e)))?
75 .connect()
76 .await
77 .map_err(|e| crate::KubericError::Internal(Box::new(e)))?;
78
79 let mut client = ReplicatorDataClient::new(channel);
80
81 let (grpc_tx, grpc_rx) = mpsc::channel::<ReplicationItem>(256);
83 let outbound = ReceiverStream::new(grpc_rx);
84
85 let response = client
86 .replication_stream(outbound)
87 .await
88 .map_err(|e| crate::KubericError::Internal(Box::new(e)))?;
89
90 let mut ack_stream = response.into_inner();
91 let rid = replica_id;
92
93 let ps = partition_state;
95 tokio::spawn(async move {
96 while let Some(result) = ack_stream.next().await {
97 match result {
98 Ok(ack) => {
99 debug!(replica_id = rid, lsn = ack.lsn, "received ACK");
100 let mut tracker = quorum_tracker.lock().await;
101 tracker.ack(ack.lsn, rid);
102 ps.set_committed_lsn(tracker.committed_lsn());
108 }
109 Err(e) => {
110 warn!(replica_id = rid, error = %e, "ACK stream error");
111 break;
112 }
113 }
114 }
115 });
116
117 let (unbounded_tx, mut unbounded_rx) = mpsc::unbounded_channel::<ReplicationItem>();
119
120 tokio::spawn(async move {
124 while let Some(item) = unbounded_rx.recv().await {
125 if grpc_tx.send(item).await.is_err() {
126 warn!(replica_id = rid, "gRPC stream closed, drain task exiting");
127 break;
128 }
129 }
130 });
131
132 self.connections.insert(
133 replica_id,
134 SecondaryConnection {
135 item_tx: unbounded_tx,
136 },
137 );
138
139 Ok(())
140 }
141
142 pub fn send_to_one(
144 &self,
145 replica_id: ReplicaId,
146 lsn: Lsn,
147 data: &bytes::Bytes,
148 committed_lsn: Lsn,
149 ) {
150 let item = ReplicationItem {
151 epoch_data_loss: self.epoch.data_loss_number,
152 epoch_config: self.epoch.configuration_number,
153 lsn,
154 data: data.to_vec(),
155 committed_lsn,
156 };
157 if let Some(conn) = self.connections.get(&replica_id)
158 && conn.item_tx.send(item).is_err()
159 {
160 warn!(replica_id, lsn, "send_to_one: channel closed");
161 }
162 }
163
164 pub fn remove_secondary(&mut self, replica_id: ReplicaId) {
166 self.connections.remove(&replica_id);
167 }
168
169 pub fn send_to_all(&mut self, lsn: Lsn, data: &bytes::Bytes, committed_lsn: Lsn) {
172 let item = ReplicationItem {
173 epoch_data_loss: self.epoch.data_loss_number,
174 epoch_config: self.epoch.configuration_number,
175 lsn,
176 data: data.to_vec(),
177 committed_lsn,
178 };
179
180 let mut dead = Vec::new();
181 for (&rid, conn) in &self.connections {
182 if conn.item_tx.send(item.clone()).is_err() {
183 warn!(
184 replica_id = rid,
185 lsn, "secondary channel closed — removing connection"
186 );
187 dead.push(rid);
188 }
189 }
190 for rid in dead {
191 self.connections.remove(&rid);
192 }
193 }
194
195 pub fn connection_count(&self) -> usize {
197 self.connections.len()
198 }
199
200 pub fn has_connection(&self, replica_id: &ReplicaId) -> bool {
202 self.connections.contains_key(replica_id)
203 }
204
205 pub fn connected_ids(&self) -> Vec<ReplicaId> {
207 self.connections.keys().cloned().collect()
208 }
209
210 pub fn close_all(&mut self) {
212 self.connections.clear();
213 }
214}