1#[cfg(test)]
14use std::collections::{HashMap, HashSet};
15#[cfg(test)]
16use std::sync::Mutex;
17
18use crate::error::{ArrayError, ArrayResult};
19use crate::sync::hlc::Hlc;
20use crate::sync::op::{ArrayOp, ArrayOpKind};
21use crate::types::coord::value::CoordValue;
22
23#[derive(Clone, Debug, PartialEq)]
27pub enum ApplyOutcome {
28 Idempotent,
30 Applied,
32 Rejected(ApplyRejection),
34}
35
36#[derive(Clone, Debug, PartialEq)]
38pub enum ApplyRejection {
39 SchemaTooNew {
43 local: Hlc,
45 op: Hlc,
47 },
48 ArrayUnknown {
52 name: String,
54 },
55 ShapeInvalid {
57 detail: String,
59 },
60 EngineRejected {
67 detail: String,
69 },
70}
71
72pub trait ApplyEngine {
80 fn schema_hlc(&self, array: &str) -> ArrayResult<Option<Hlc>>;
83
84 fn already_seen(&self, array: &str, hlc: Hlc) -> ArrayResult<bool>;
88
89 fn apply_put(&mut self, op: &ArrayOp) -> ArrayResult<()>;
91
92 fn apply_delete(&mut self, op: &ArrayOp) -> ArrayResult<()>;
94
95 fn apply_erase(&mut self, op: &ArrayOp) -> ArrayResult<()>;
97
98 fn invalidate_tile(&mut self, array: &str, coord: &[CoordValue]) -> ArrayResult<()>;
103}
104
105pub fn apply_op<E: ApplyEngine>(engine: &mut E, op: &ArrayOp) -> ArrayResult<ApplyOutcome> {
120 if let Err(e) = op.validate_shape() {
122 return Ok(ApplyOutcome::Rejected(ApplyRejection::ShapeInvalid {
123 detail: e.to_string(),
124 }));
125 }
126
127 match engine.schema_hlc(&op.header.array)? {
129 None => {
130 return Ok(ApplyOutcome::Rejected(ApplyRejection::ArrayUnknown {
131 name: op.header.array.clone(),
132 }));
133 }
134 Some(local_schema) if op.header.schema_hlc > local_schema => {
135 return Ok(ApplyOutcome::Rejected(ApplyRejection::SchemaTooNew {
136 local: local_schema,
137 op: op.header.schema_hlc,
138 }));
139 }
140 Some(_) => {}
141 }
142
143 if engine.already_seen(&op.header.array, op.header.hlc)? {
145 return Ok(ApplyOutcome::Idempotent);
146 }
147
148 let dispatch_result = match op.kind {
150 ArrayOpKind::Put => engine.apply_put(op),
151 ArrayOpKind::Delete => engine.apply_delete(op),
152 ArrayOpKind::Erase => engine.apply_erase(op),
153 };
154
155 if let Err(e) = dispatch_result {
156 match &e {
158 ArrayError::SegmentCorruption { .. } | ArrayError::HlcLockPoisoned => return Err(e),
159 _ => {
160 return Ok(ApplyOutcome::Rejected(ApplyRejection::EngineRejected {
161 detail: e.to_string(),
162 }));
163 }
164 }
165 }
166
167 engine.invalidate_tile(&op.header.array, &op.coord)?;
169
170 Ok(ApplyOutcome::Applied)
172}
173
174#[cfg(test)]
185pub struct MockEngine {
186 schemas: HashMap<String, Hlc>,
187 seen: HashSet<(String, [u8; 18])>,
188 pub applied: Vec<ArrayOp>,
189 pub invalidated: Vec<(String, Vec<CoordValue>)>,
190 reject_next_with: Mutex<Option<ArrayError>>,
191}
192
193#[cfg(test)]
194impl MockEngine {
195 pub fn new() -> Self {
197 Self {
198 schemas: HashMap::new(),
199 seen: HashSet::new(),
200 applied: Vec::new(),
201 invalidated: Vec::new(),
202 reject_next_with: Mutex::new(None),
203 }
204 }
205
206 pub fn register_array(&mut self, array: &str, schema_hlc: Hlc) {
208 self.schemas.insert(array.to_string(), schema_hlc);
209 }
210
211 pub fn set_reject_next(&self, err: ArrayError) {
213 *self
214 .reject_next_with
215 .lock()
216 .expect("invariant: MockEngine mutex is not poisoned") = Some(err);
217 }
218
219 fn take_inject(&self) -> Option<ArrayError> {
220 self.reject_next_with
221 .lock()
222 .expect("invariant: MockEngine mutex is not poisoned")
223 .take()
224 }
225}
226
227#[cfg(test)]
228impl Default for MockEngine {
229 fn default() -> Self {
230 Self::new()
231 }
232}
233
234#[cfg(test)]
235impl ApplyEngine for MockEngine {
236 fn schema_hlc(&self, array: &str) -> ArrayResult<Option<Hlc>> {
237 Ok(self.schemas.get(array).copied())
238 }
239
240 fn already_seen(&self, array: &str, hlc: Hlc) -> ArrayResult<bool> {
241 Ok(self.seen.contains(&(array.to_string(), hlc.to_bytes())))
242 }
243
244 fn apply_put(&mut self, op: &ArrayOp) -> ArrayResult<()> {
245 if let Some(err) = self.take_inject() {
246 return Err(err);
247 }
248 self.seen
249 .insert((op.header.array.clone(), op.header.hlc.to_bytes()));
250 self.applied.push(op.clone());
251 Ok(())
252 }
253
254 fn apply_delete(&mut self, op: &ArrayOp) -> ArrayResult<()> {
255 if let Some(err) = self.take_inject() {
256 return Err(err);
257 }
258 self.seen
259 .insert((op.header.array.clone(), op.header.hlc.to_bytes()));
260 self.applied.push(op.clone());
261 Ok(())
262 }
263
264 fn apply_erase(&mut self, op: &ArrayOp) -> ArrayResult<()> {
265 if let Some(err) = self.take_inject() {
266 return Err(err);
267 }
268 self.seen
269 .insert((op.header.array.clone(), op.header.hlc.to_bytes()));
270 self.applied.push(op.clone());
271 Ok(())
272 }
273
274 fn invalidate_tile(&mut self, array: &str, coord: &[CoordValue]) -> ArrayResult<()> {
275 self.invalidated.push((array.to_string(), coord.to_vec()));
276 Ok(())
277 }
278}
279
280#[cfg(test)]
283mod tests {
284 use super::*;
285 use crate::sync::hlc::Hlc;
286 use crate::sync::op::{ArrayOpHeader, ArrayOpKind};
287 use crate::sync::replica_id::ReplicaId;
288 use crate::types::cell_value::value::CellValue;
289 use crate::types::coord::value::CoordValue;
290
291 fn replica() -> ReplicaId {
292 ReplicaId::new(42)
293 }
294
295 fn hlc(ms: u64) -> Hlc {
296 Hlc::new(ms, 0, replica()).unwrap()
297 }
298
299 fn header(array: &str, op_ms: u64, schema_ms: u64) -> ArrayOpHeader {
300 ArrayOpHeader {
301 array: array.into(),
302 hlc: hlc(op_ms),
303 schema_hlc: hlc(schema_ms),
304 valid_from_ms: 0,
305 valid_until_ms: -1,
306 system_from_ms: op_ms as i64,
307 }
308 }
309
310 fn put_op(array: &str, op_ms: u64, schema_ms: u64) -> ArrayOp {
311 ArrayOp {
312 header: header(array, op_ms, schema_ms),
313 kind: ArrayOpKind::Put,
314 coord: vec![CoordValue::Int64(1)],
315 attrs: Some(vec![CellValue::Null]),
316 }
317 }
318
319 fn delete_op(array: &str, op_ms: u64, schema_ms: u64) -> ArrayOp {
320 ArrayOp {
321 header: header(array, op_ms, schema_ms),
322 kind: ArrayOpKind::Delete,
323 coord: vec![CoordValue::Int64(1)],
324 attrs: None,
325 }
326 }
327
328 #[test]
329 fn apply_put_succeeds() {
330 let mut engine = MockEngine::new();
331 engine.register_array("a", hlc(100));
332
333 let op = put_op("a", 50, 100);
334 let outcome = apply_op(&mut engine, &op).unwrap();
335 assert_eq!(outcome, ApplyOutcome::Applied);
336 assert_eq!(engine.applied.len(), 1);
337 assert_eq!(engine.invalidated.len(), 1);
338 }
339
340 #[test]
341 fn apply_idempotent_on_replay() {
342 let mut engine = MockEngine::new();
343 engine.register_array("a", hlc(100));
344
345 let op = put_op("a", 50, 100);
346 apply_op(&mut engine, &op).unwrap();
347 let outcome = apply_op(&mut engine, &op).unwrap();
348 assert_eq!(outcome, ApplyOutcome::Idempotent);
349 assert_eq!(engine.applied.len(), 1);
351 }
352
353 #[test]
354 fn apply_rejects_unknown_array() {
355 let mut engine = MockEngine::new();
356 let op = put_op("unknown", 50, 100);
357 let outcome = apply_op(&mut engine, &op).unwrap();
358 assert!(matches!(
359 outcome,
360 ApplyOutcome::Rejected(ApplyRejection::ArrayUnknown { name }) if name == "unknown"
361 ));
362 }
363
364 #[test]
365 fn apply_rejects_schema_too_new() {
366 let mut engine = MockEngine::new();
367 engine.register_array("a", hlc(50)); let op = put_op("a", 10, 100); let outcome = apply_op(&mut engine, &op).unwrap();
370 assert!(matches!(
371 outcome,
372 ApplyOutcome::Rejected(ApplyRejection::SchemaTooNew { local, op: op_hlc })
373 if local == hlc(50) && op_hlc == hlc(100)
374 ));
375 }
376
377 #[test]
378 fn apply_rejects_invalid_shape() {
379 let mut engine = MockEngine::new();
380 engine.register_array("a", hlc(100));
381 let op = ArrayOp {
383 header: header("a", 50, 100),
384 kind: ArrayOpKind::Put,
385 coord: vec![CoordValue::Int64(1)],
386 attrs: None,
387 };
388 let outcome = apply_op(&mut engine, &op).unwrap();
389 assert!(matches!(
390 outcome,
391 ApplyOutcome::Rejected(ApplyRejection::ShapeInvalid { .. })
392 ));
393 }
394
395 #[test]
396 fn apply_wraps_engine_error_as_rejection() {
397 let mut engine = MockEngine::new();
398 engine.register_array("a", hlc(100));
399 engine.set_reject_next(ArrayError::InvalidOp {
400 detail: "simulated engine error".into(),
401 });
402 let op = put_op("a", 50, 100);
403 let outcome = apply_op(&mut engine, &op).unwrap();
404 assert!(matches!(
405 outcome,
406 ApplyOutcome::Rejected(ApplyRejection::EngineRejected { .. })
407 ));
408 assert!(engine.invalidated.is_empty());
410 }
411
412 #[test]
413 fn apply_invalidates_tile_after_success() {
414 let mut engine = MockEngine::new();
415 engine.register_array("b", hlc(100));
416 let op = delete_op("b", 30, 50);
417 apply_op(&mut engine, &op).unwrap();
418 assert_eq!(engine.invalidated.len(), 1);
419 assert_eq!(engine.invalidated[0].0, "b");
420 }
421}