1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::sync::atomic::{AtomicI64, AtomicU8, Ordering};
5
6use bytes::Bytes;
7use tokio::sync::{mpsc, oneshot};
8
9use crate::error::{KubericError, Result};
10use crate::events::ReplicateRequest;
11use crate::types::{AccessStatus, CancellationToken, FaultType, Lsn, ReplicaId};
12
13pub struct PartitionState {
25 read_status: AtomicU8,
26 write_status: AtomicU8,
27 current_progress: AtomicI64,
28 catch_up_capability: AtomicI64,
29 committed_lsn: AtomicI64,
30 copy_lsn_map: Mutex<HashMap<ReplicaId, Lsn>>,
31}
32
33impl PartitionState {
34 pub fn new() -> Self {
35 Self {
36 read_status: AtomicU8::new(AccessStatus::NotPrimary as u8),
37 write_status: AtomicU8::new(AccessStatus::NotPrimary as u8),
38 current_progress: AtomicI64::new(0),
39 catch_up_capability: AtomicI64::new(0),
40 committed_lsn: AtomicI64::new(0),
41 copy_lsn_map: Mutex::new(HashMap::new()),
42 }
43 }
44
45 pub fn read_status(&self) -> AccessStatus {
48 AccessStatus::from_u8(self.read_status.load(Ordering::Acquire))
49 }
50
51 pub fn write_status(&self) -> AccessStatus {
52 AccessStatus::from_u8(self.write_status.load(Ordering::Acquire))
53 }
54
55 pub fn current_progress(&self) -> Lsn {
56 self.current_progress.load(Ordering::Acquire)
57 }
58
59 pub fn catch_up_capability(&self) -> Lsn {
60 self.catch_up_capability.load(Ordering::Acquire)
61 }
62
63 pub fn committed_lsn(&self) -> Lsn {
64 self.committed_lsn.load(Ordering::Acquire)
65 }
66
67 pub fn set_read_status(&self, status: AccessStatus) {
70 self.read_status.store(status as u8, Ordering::Release);
71 }
72
73 pub fn set_write_status(&self, status: AccessStatus) {
74 self.write_status.store(status as u8, Ordering::Release);
75 }
76
77 pub fn set_current_progress(&self, lsn: Lsn) {
78 self.current_progress.store(lsn, Ordering::Release);
79 }
80
81 pub fn set_catch_up_capability(&self, lsn: Lsn) {
82 self.catch_up_capability.store(lsn, Ordering::Release);
83 }
84
85 pub fn set_committed_lsn(&self, lsn: Lsn) {
86 self.committed_lsn.store(lsn, Ordering::Release);
87 }
88
89 pub fn set_copy_lsn(&self, replica_id: ReplicaId, lsn: Lsn) {
92 self.copy_lsn_map.lock().unwrap().insert(replica_id, lsn);
93 }
94
95 pub fn take_copy_lsn(&self, replica_id: &ReplicaId) -> Option<Lsn> {
99 self.copy_lsn_map.lock().unwrap().remove(replica_id)
100 }
101}
102
103impl Default for PartitionState {
104 fn default() -> Self {
105 Self::new()
106 }
107}
108
109pub struct PartitionHandle {
115 state: Arc<PartitionState>,
116 fault_tx: mpsc::Sender<FaultType>,
117}
118
119impl PartitionHandle {
120 pub fn new(state: Arc<PartitionState>, fault_tx: mpsc::Sender<FaultType>) -> Self {
121 Self { state, fault_tx }
122 }
123
124 pub fn read_status(&self) -> AccessStatus {
126 self.state.read_status()
127 }
128
129 pub fn write_status(&self) -> AccessStatus {
131 self.state.write_status()
132 }
133
134 pub fn report_fault(&self, fault_type: FaultType) {
136 let _ = self.fault_tx.try_send(fault_type);
137 }
138}
139
140#[derive(Clone)]
148pub struct StateReplicatorHandle {
149 data_tx: mpsc::Sender<ReplicateRequest>,
150 state: Arc<PartitionState>,
151}
152
153impl StateReplicatorHandle {
154 pub fn new(data_tx: mpsc::Sender<ReplicateRequest>, state: Arc<PartitionState>) -> Self {
155 Self { data_tx, state }
156 }
157
158 pub async fn replicate(&self, data: Bytes, token: CancellationToken) -> Result<Lsn> {
163 match self.state.write_status() {
165 AccessStatus::Granted => {}
166 AccessStatus::NotPrimary => return Err(KubericError::NotPrimary),
167 AccessStatus::NoWriteQuorum => return Err(KubericError::NoWriteQuorum),
168 AccessStatus::ReconfigurationPending => {
169 return Err(KubericError::ReconfigurationPending);
170 }
171 }
172
173 let (reply_tx, reply_rx) = oneshot::channel();
174 self.data_tx
175 .send(ReplicateRequest {
176 data,
177 reply: reply_tx,
178 })
179 .await
180 .map_err(|_| KubericError::Closed)?;
181
182 tokio::select! {
183 result = reply_rx => result.map_err(|_| KubericError::Closed)?,
184 _ = token.cancelled() => Err(KubericError::Cancelled),
185 }
186 }
187}