Skip to main content

kuberic_core/
handles.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::sync::atomic::{AtomicI64, AtomicU8, Ordering};
5
6use bytes::Bytes;
7use tokio::sync::{mpsc, oneshot};
8
9use crate::error::{KubericError, Result};
10use crate::events::ReplicateRequest;
11use crate::types::{AccessStatus, CancellationToken, FaultType, Lsn, ReplicaId};
12
13// ---------------------------------------------------------------------------
14// PartitionState — shared atomics written by replicator, read by handles
15// ---------------------------------------------------------------------------
16
17/// Shared partition state. Written by the replicator actor on the control
18/// channel, read lock-free by PartitionHandle and StateReplicatorHandle.
19///
20/// `copy_lsn_map` stores per-replica copy boundary LSNs. Written by
21/// `run_build_replica_copy` after snapshotting state, read by the actor
22/// at `UpdateCatchUpConfiguration` to replay only ops beyond the copy
23/// boundary (matching SF's gap-free build approach).
24pub struct PartitionState {
25    read_status: AtomicU8,
26    write_status: AtomicU8,
27    current_progress: AtomicI64,
28    catch_up_capability: AtomicI64,
29    committed_lsn: AtomicI64,
30    copy_lsn_map: Mutex<HashMap<ReplicaId, Lsn>>,
31}
32
33impl PartitionState {
34    pub fn new() -> Self {
35        Self {
36            read_status: AtomicU8::new(AccessStatus::NotPrimary as u8),
37            write_status: AtomicU8::new(AccessStatus::NotPrimary as u8),
38            current_progress: AtomicI64::new(0),
39            catch_up_capability: AtomicI64::new(0),
40            committed_lsn: AtomicI64::new(0),
41            copy_lsn_map: Mutex::new(HashMap::new()),
42        }
43    }
44
45    // --- Reads (lock-free, used by handles and runtime) ---
46
47    pub fn read_status(&self) -> AccessStatus {
48        AccessStatus::from_u8(self.read_status.load(Ordering::Acquire))
49    }
50
51    pub fn write_status(&self) -> AccessStatus {
52        AccessStatus::from_u8(self.write_status.load(Ordering::Acquire))
53    }
54
55    pub fn current_progress(&self) -> Lsn {
56        self.current_progress.load(Ordering::Acquire)
57    }
58
59    pub fn catch_up_capability(&self) -> Lsn {
60        self.catch_up_capability.load(Ordering::Acquire)
61    }
62
63    pub fn committed_lsn(&self) -> Lsn {
64        self.committed_lsn.load(Ordering::Acquire)
65    }
66
67    // --- Writes (called by replicator actor only) ---
68
69    pub fn set_read_status(&self, status: AccessStatus) {
70        self.read_status.store(status as u8, Ordering::Release);
71    }
72
73    pub fn set_write_status(&self, status: AccessStatus) {
74        self.write_status.store(status as u8, Ordering::Release);
75    }
76
77    pub fn set_current_progress(&self, lsn: Lsn) {
78        self.current_progress.store(lsn, Ordering::Release);
79    }
80
81    pub fn set_catch_up_capability(&self, lsn: Lsn) {
82        self.catch_up_capability.store(lsn, Ordering::Release);
83    }
84
85    pub fn set_committed_lsn(&self, lsn: Lsn) {
86        self.committed_lsn.store(lsn, Ordering::Release);
87    }
88
89    /// Record the copy snapshot LSN for a replica being built.
90    /// Called by `run_build_replica_copy` after collecting state.
91    pub fn set_copy_lsn(&self, replica_id: ReplicaId, lsn: Lsn) {
92        self.copy_lsn_map.lock().unwrap().insert(replica_id, lsn);
93    }
94
95    /// Take (read and remove) the copy snapshot LSN for a replica.
96    /// Called by the actor at UpdateCatchUpConfiguration to determine
97    /// the precise replay boundary.
98    pub fn take_copy_lsn(&self, replica_id: &ReplicaId) -> Option<Lsn> {
99        self.copy_lsn_map.lock().unwrap().remove(replica_id)
100    }
101}
102
103impl Default for PartitionState {
104    fn default() -> Self {
105        Self::new()
106    }
107}
108
109// ---------------------------------------------------------------------------
110// PartitionHandle — user-facing, reads from PartitionState atomics
111// ---------------------------------------------------------------------------
112
113/// User-facing partition handle. Reads access status from shared atomics.
114pub struct PartitionHandle {
115    state: Arc<PartitionState>,
116    fault_tx: mpsc::Sender<FaultType>,
117}
118
119impl PartitionHandle {
120    pub fn new(state: Arc<PartitionState>, fault_tx: mpsc::Sender<FaultType>) -> Self {
121        Self { state, fault_tx }
122    }
123
124    /// Current read access status. Check before serving read requests.
125    pub fn read_status(&self) -> AccessStatus {
126        self.state.read_status()
127    }
128
129    /// Current write access status. Check before serving write requests.
130    pub fn write_status(&self) -> AccessStatus {
131        self.state.write_status()
132    }
133
134    /// Report a fault to trigger failover or restart.
135    pub fn report_fault(&self, fault_type: FaultType) {
136        let _ = self.fault_tx.try_send(fault_type);
137    }
138}
139
140// ---------------------------------------------------------------------------
141// StateReplicatorHandle — user-facing, sends to replicator data channel
142// ---------------------------------------------------------------------------
143
144/// User-facing write handle. Backed by the replicator's data channel.
145/// Includes fast-path access status check to avoid channel round-trip
146/// when writes are not allowed.
147#[derive(Clone)]
148pub struct StateReplicatorHandle {
149    data_tx: mpsc::Sender<ReplicateRequest>,
150    state: Arc<PartitionState>,
151}
152
153impl StateReplicatorHandle {
154    pub fn new(data_tx: mpsc::Sender<ReplicateRequest>, state: Arc<PartitionState>) -> Self {
155        Self { data_tx, state }
156    }
157
158    /// Replicate data to quorum. Returns the assigned LSN.
159    ///
160    /// Fast-path: checks write status before sending to the replicator.
161    /// The replicator also checks internally (authoritative gate).
162    pub async fn replicate(&self, data: Bytes, token: CancellationToken) -> Result<Lsn> {
163        // Fast-path access status check
164        match self.state.write_status() {
165            AccessStatus::Granted => {}
166            AccessStatus::NotPrimary => return Err(KubericError::NotPrimary),
167            AccessStatus::NoWriteQuorum => return Err(KubericError::NoWriteQuorum),
168            AccessStatus::ReconfigurationPending => {
169                return Err(KubericError::ReconfigurationPending);
170            }
171        }
172
173        let (reply_tx, reply_rx) = oneshot::channel();
174        self.data_tx
175            .send(ReplicateRequest {
176                data,
177                reply: reply_tx,
178            })
179            .await
180            .map_err(|_| KubericError::Closed)?;
181
182        tokio::select! {
183            result = reply_rx => result.map_err(|_| KubericError::Closed)?,
184            _ = token.cancelled() => Err(KubericError::Cancelled),
185        }
186    }
187}