1use std::collections::HashSet;
2
3use chrono::Utc;
4
5use crate::buffer::EventBuffer;
6use crate::config::SyncConfig;
7use crate::conflict::{ConflictResolver, ConflictStrategy, Resolution};
8use crate::error::SyncError;
9use crate::event::SyncEvent;
10use crate::outbox::Outbox;
11use crate::state::{SyncState, SyncStatus};
12use crate::transport::{PullPage, PullResult, PushResult, Transport, derive_next_cursor};
13
14const MAX_PULL_PAGES: usize = 10_000;
16
17#[derive(Debug)]
41pub struct SyncEngine {
42 config: SyncConfig,
43 state: SyncState,
44 outbox: Outbox,
45 buffer: EventBuffer,
46 resolver: ConflictResolver,
47 last_pulled_sequence: u64,
48 next_pull_cursor: Option<u64>,
49 initialized: bool,
50}
51
52impl SyncEngine {
53 pub fn new(config: SyncConfig) -> Result<Self, SyncError> {
60 Self::try_new(config)
61 }
62
63 pub fn with_strategy(
70 config: SyncConfig,
71 strategy: ConflictStrategy,
72 ) -> Result<Self, SyncError> {
73 Self::try_with_strategy(config, strategy)
74 }
75
76 pub fn try_new(config: SyncConfig) -> Result<Self, SyncError> {
83 Self::try_with_strategy(config, ConflictStrategy::default())
84 }
85
86 pub fn try_with_strategy(
93 config: SyncConfig,
94 strategy: ConflictStrategy,
95 ) -> Result<Self, SyncError> {
96 config.validate()?;
97 let buffer_capacity = config.resolved_buffer_capacity();
98 let outbox = if let Some(path) = config.outbox_path.as_deref() {
99 Outbox::with_persistence(config.resolved_outbox_capacity(), path)?
100 } else {
101 Outbox::new(config.resolved_outbox_capacity())
102 };
103
104 Ok(Self {
105 config,
106 state: SyncState::default(),
107 outbox,
108 buffer: EventBuffer::new(buffer_capacity),
109 resolver: ConflictResolver::new(strategy),
110 last_pulled_sequence: 0,
111 next_pull_cursor: None,
112 initialized: true,
113 })
114 }
115
116 pub fn record(&mut self, event: SyncEvent) -> Result<u64, SyncError> {
124 let seq = self.outbox.append(event)?;
125 self.state.local_head = seq;
126 self.state.pending_count = self.outbox.count();
127 Ok(seq)
128 }
129
130 pub async fn push(&mut self, transport: &dyn Transport) -> Result<PushResult, SyncError> {
138 let batch_size = self.config.resolved_batch_size();
139 let events: Vec<SyncEvent> = self.outbox.peek(batch_size).into_iter().cloned().collect();
140
141 if events.is_empty() {
142 return Ok(PushResult { accepted: 0, remote_head: self.state.remote_head });
143 }
144
145 let result = transport.push_events(&events).await?;
146 let accepted = result.accepted.min(events.len());
147
148 if accepted > 0 {
149 if let Err(err) = self.outbox.drain(accepted) {
150 self.state.pending_count = self.outbox.count();
151 return Err(err);
152 }
153 }
154
155 self.state.remote_head = result.remote_head;
156 self.state.last_push = Some(Utc::now());
157 self.state.pending_count = self.outbox.count();
158
159 Ok(result)
160 }
161
162 pub async fn pull(&mut self, transport: &dyn Transport) -> Result<PullResult, SyncError> {
172 let since = self.next_pull_cursor.unwrap_or(self.last_pulled_sequence);
173 let (result, next_cursor) = self.pull_since(transport, since).await?;
174 self.next_pull_cursor = next_cursor;
175 Ok(result)
176 }
177
178 async fn pull_since(
179 &mut self,
180 transport: &dyn Transport,
181 since: u64,
182 ) -> Result<(PullResult, Option<u64>), SyncError> {
183 let limit = self.config.resolved_batch_size();
184 let PullPage { result, next_cursor: transport_next_cursor } =
185 transport.pull_events_page(since, limit).await?;
186
187 let pending: Vec<SyncEvent> =
189 self.outbox.peek(self.outbox.count()).into_iter().cloned().collect();
190 let mut drop_local_ids = HashSet::new();
191 let mut events_to_buffer = Vec::with_capacity(result.events.len());
192
193 for pulled_event in &result.events {
194 let mut keep_remote = true;
195
196 if let Some(local_event) = pending.iter().rev().find(|local_event| {
197 local_event.entity_type == pulled_event.entity_type
198 && local_event.entity_id == pulled_event.entity_id
199 }) {
200 match self.resolver.resolve(local_event, pulled_event) {
201 Resolution::KeepLocal => {
202 keep_remote = false;
203 }
204 Resolution::KeepRemote => {
205 drop_local_ids.insert(local_event.id);
206 }
207 Resolution::Merge(merged) => {
208 drop_local_ids.insert(local_event.id);
209 events_to_buffer.push(merged);
210 keep_remote = false;
211 }
212 }
213 }
214
215 if keep_remote {
216 events_to_buffer.push(pulled_event.clone());
217 }
218 }
219
220 if !drop_local_ids.is_empty() {
221 self.outbox.try_retain(|event| !drop_local_ids.contains(&event.id))?;
222 self.state.pending_count = self.outbox.count();
223 }
224
225 for event in events_to_buffer {
227 self.buffer.push(event);
228 }
229
230 self.state.remote_head = result.remote_head;
231 self.state.last_pull = Some(Utc::now());
232 let observed_cursor = transport_next_cursor
233 .filter(|cursor| *cursor > since)
234 .or_else(|| derive_next_cursor(since, &result.events))
235 .unwrap_or(since);
236 self.last_pulled_sequence = self.last_pulled_sequence.max(observed_cursor);
237
238 let next_cursor = Self::resolve_next_cursor(
239 since,
240 &result.events,
241 result.has_more,
242 transport_next_cursor,
243 )?;
244
245 Ok((result, next_cursor))
246 }
247
248 fn resolve_next_cursor(
249 since: u64,
250 events: &[SyncEvent],
251 has_more: bool,
252 transport_next_cursor: Option<u64>,
253 ) -> Result<Option<u64>, SyncError> {
254 if !has_more {
255 return Ok(None);
256 }
257
258 let next_cursor = transport_next_cursor.or_else(|| derive_next_cursor(since, events));
259 let Some(next_cursor) = next_cursor else {
260 return Err(SyncError::Transport(
261 "pull pagination stalled: has_more=true but no advancing cursor available"
262 .to_string(),
263 ));
264 };
265 if next_cursor <= since {
266 return Err(SyncError::Transport(format!(
267 "pull pagination cursor did not advance (since={since}, next_cursor={next_cursor})"
268 )));
269 }
270 Ok(Some(next_cursor))
271 }
272
273 #[must_use]
275 pub fn status(&self) -> SyncStatus {
276 SyncStatus {
277 initialized: self.initialized,
278 local_head: self.state.local_head,
279 remote_head: self.state.remote_head,
280 pending: self.outbox.count(),
281 lag: self.state.lag(),
282 last_push: self.state.last_push,
283 last_pull: self.state.last_pull,
284 buffered_events: self.buffer.len(),
285 }
286 }
287
288 #[must_use]
290 pub fn pending_count(&self) -> usize {
291 self.outbox.count()
292 }
293
294 #[must_use]
296 pub fn buffered_count(&self) -> usize {
297 self.buffer.len()
298 }
299
300 pub fn drain_buffer(&mut self) -> Vec<SyncEvent> {
302 self.buffer.drain_all()
303 }
304
305 #[must_use]
307 pub const fn state(&self) -> &SyncState {
308 &self.state
309 }
310
311 #[must_use]
313 pub const fn config(&self) -> &SyncConfig {
314 &self.config
315 }
316
317 #[must_use]
319 pub const fn resolver(&self) -> &ConflictResolver {
320 &self.resolver
321 }
322
323 pub async fn full_sync(
329 &mut self,
330 transport: &dyn Transport,
331 ) -> Result<(PushResult, PullResult), SyncError> {
332 let push_result = self.push(transport).await?;
333 let mut since = self.next_pull_cursor.unwrap_or(self.last_pulled_sequence);
334 let (mut pull_result, next_cursor) = self.pull_since(transport, since).await?;
335 self.next_pull_cursor = next_cursor;
336 let mut pull_pages = 1;
337
338 while pull_result.has_more {
339 if pull_pages >= MAX_PULL_PAGES {
340 return Err(SyncError::Transport(
341 "pull pagination exceeded safety limit".to_string(),
342 ));
343 }
344
345 since = self.next_pull_cursor.ok_or_else(|| {
346 SyncError::Transport(
347 "pull pagination stalled: has_more=true but no continuation cursor".to_string(),
348 )
349 })?;
350
351 let (next_page, page_cursor) = self.pull_since(transport, since).await?;
352 self.next_pull_cursor = page_cursor;
353
354 pull_result.events.extend(next_page.events);
355 pull_result.remote_head = next_page.remote_head;
356 pull_result.has_more = next_page.has_more;
357 pull_pages += 1;
358 }
359
360 self.next_pull_cursor = None;
361 Ok((push_result, pull_result))
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368 use crate::transport::NullTransport;
369 use proptest::prelude::*;
370 use serde_json::json;
371 use std::sync::Arc;
372 use std::sync::Mutex;
373 use std::sync::atomic::{AtomicU64, Ordering};
374 use tempfile::tempdir;
375
376 fn make_config() -> SyncConfig {
377 SyncConfig::new("agent-1", "tenant-1", "store-1")
378 }
379
380 fn make_event(event_type: &str) -> SyncEvent {
381 SyncEvent::new(event_type, "order", "ORD-1", json!({}))
382 }
383
384 #[test]
385 fn new_engine() {
386 let engine = SyncEngine::new(make_config()).unwrap();
387 assert_eq!(engine.pending_count(), 0);
388 assert_eq!(engine.buffered_count(), 0);
389 assert!(engine.status().initialized);
390 }
391
392 #[test]
393 fn record_event() {
394 let mut engine = SyncEngine::new(make_config()).unwrap();
395 let seq = engine.record(make_event("order.created")).unwrap();
396 assert_eq!(seq, 1);
397 assert_eq!(engine.pending_count(), 1);
398 assert_eq!(engine.state().local_head, 1);
399 }
400
401 #[test]
402 fn record_multiple_events() {
403 let mut engine = SyncEngine::new(make_config()).unwrap();
404 engine.record(make_event("a")).unwrap();
405 engine.record(make_event("b")).unwrap();
406 engine.record(make_event("c")).unwrap();
407 assert_eq!(engine.pending_count(), 3);
408 assert_eq!(engine.state().local_head, 3);
409 }
410
411 #[tokio::test]
412 async fn push_with_null_transport() {
413 let mut engine = SyncEngine::new(make_config()).unwrap();
414 engine.record(make_event("a")).unwrap();
415 engine.record(make_event("b")).unwrap();
416
417 let transport = NullTransport::new();
418 let result = engine.push(&transport).await.unwrap();
419 assert_eq!(result.accepted, 2);
420 assert_eq!(engine.pending_count(), 0);
421 assert!(engine.state().last_push.is_some());
422 }
423
424 #[tokio::test]
425 async fn push_empty_outbox() {
426 let mut engine = SyncEngine::new(make_config()).unwrap();
427 let transport = NullTransport::new();
428 let result = engine.push(&transport).await.unwrap();
429 assert_eq!(result.accepted, 0);
430 }
431
432 #[tokio::test]
433 async fn pull_with_null_transport() {
434 let mut engine = SyncEngine::new(make_config()).unwrap();
435 let transport = NullTransport::new();
436 let result = engine.pull(&transport).await.unwrap();
437 assert!(result.events.is_empty());
438 assert!(!result.has_more);
439 assert!(engine.state().last_pull.is_some());
440 }
441
442 #[tokio::test]
443 async fn pull_buffers_events() {
444 #[derive(Debug)]
446 struct MockPullTransport {
447 events: Vec<SyncEvent>,
448 head: u64,
449 }
450
451 #[async_trait::async_trait]
452 impl Transport for MockPullTransport {
453 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
454 Ok(PushResult { accepted: events.len(), remote_head: self.head })
455 }
456 async fn pull_events(
457 &self,
458 _since: u64,
459 _limit: usize,
460 ) -> Result<PullResult, SyncError> {
461 Ok(PullResult {
462 events: self.events.clone(),
463 remote_head: self.head,
464 has_more: false,
465 })
466 }
467 }
468
469 let transport = MockPullTransport {
470 events: vec![
471 make_event("pulled-1").with_sequence(1),
472 make_event("pulled-2").with_sequence(2),
473 ],
474 head: 2,
475 };
476
477 let mut engine = SyncEngine::new(make_config()).unwrap();
478 let result = engine.pull(&transport).await.unwrap();
479 assert_eq!(result.events.len(), 2);
480 assert_eq!(engine.buffered_count(), 2);
481 assert_eq!(engine.state().remote_head, 2);
482 }
483
484 #[tokio::test]
485 async fn full_sync() {
486 let mut engine = SyncEngine::new(make_config()).unwrap();
487 engine.record(make_event("local")).unwrap();
488
489 let transport = NullTransport::new();
490 let (push_result, pull_result) = engine.full_sync(&transport).await.unwrap();
491 assert_eq!(push_result.accepted, 1);
492 assert!(pull_result.events.is_empty());
493 assert_eq!(engine.pending_count(), 0);
494 }
495
496 #[test]
497 fn status_reporting() {
498 let mut engine = SyncEngine::new(make_config()).unwrap();
499 engine.record(make_event("a")).unwrap();
500 engine.record(make_event("b")).unwrap();
501
502 let status = engine.status();
503 assert!(status.initialized);
504 assert_eq!(status.pending, 2);
505 assert_eq!(status.local_head, 2);
506 assert_eq!(status.remote_head, 0);
507 assert_eq!(status.lag, 0);
508 assert!(status.last_push.is_none());
509 }
510
511 #[test]
512 fn drain_buffer() {
513 let mut engine = SyncEngine::new(make_config()).unwrap();
514 engine.buffer.push(make_event("buffered"));
516 assert_eq!(engine.buffered_count(), 1);
517
518 let drained = engine.drain_buffer();
519 assert_eq!(drained.len(), 1);
520 assert_eq!(engine.buffered_count(), 0);
521 }
522
523 #[test]
524 fn engine_with_strategy() {
525 let engine = SyncEngine::with_strategy(make_config(), ConflictStrategy::LocalWins).unwrap();
526 assert_eq!(engine.resolver().strategy(), ConflictStrategy::LocalWins);
527 }
528
529 #[test]
530 fn config_accessor() {
531 let config = make_config();
532 let engine = SyncEngine::new(config).unwrap();
533 assert_eq!(engine.config().agent_id, "agent-1");
534 }
535
536 #[test]
537 fn try_new_rejects_invalid_config() {
538 let bad = SyncConfig::new("", "tenant", "store");
539 assert!(SyncEngine::try_new(bad).is_err());
540 }
541
542 #[test]
543 fn try_new_with_persistent_outbox_restores_pending_events() {
544 let dir = tempdir().unwrap();
545 let outbox_path = dir.path().join("sync-outbox.json");
546 let path_str = outbox_path.to_string_lossy().to_string();
547
548 {
549 let mut engine = SyncEngine::try_new(
550 SyncConfig::new("agent-1", "tenant-1", "store-1")
551 .with_outbox_path(path_str.clone()),
552 )
553 .unwrap();
554 engine.record(make_event("persisted-a")).unwrap();
555 engine.record(make_event("persisted-b")).unwrap();
556 assert_eq!(engine.pending_count(), 2);
557 }
558
559 let engine = SyncEngine::try_new(
560 SyncConfig::new("agent-1", "tenant-1", "store-1").with_outbox_path(path_str),
561 )
562 .unwrap();
563 assert_eq!(engine.pending_count(), 2);
564 }
565
566 #[tokio::test]
567 async fn push_respects_batch_size() {
568 let config = SyncConfig::new("agent-1", "tenant-1", "store-1").with_batch_size(2);
569 let mut engine = SyncEngine::new(config).unwrap();
570 engine.record(make_event("a")).unwrap();
571 engine.record(make_event("b")).unwrap();
572 engine.record(make_event("c")).unwrap();
573
574 let transport = NullTransport::new();
575 let result = engine.push(&transport).await.unwrap();
576 assert_eq!(result.accepted, 2);
578 assert_eq!(engine.pending_count(), 1);
579 }
580
581 #[tokio::test]
582 async fn push_updates_state() {
583 #[derive(Debug)]
585 struct MockHeadTransport {
586 head: Arc<AtomicU64>,
587 }
588
589 #[async_trait::async_trait]
590 impl Transport for MockHeadTransport {
591 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
592 let new_head = self.head.fetch_add(events.len() as u64, Ordering::SeqCst)
593 + events.len() as u64;
594 Ok(PushResult { accepted: events.len(), remote_head: new_head })
595 }
596 async fn pull_events(
597 &self,
598 _since: u64,
599 _limit: usize,
600 ) -> Result<PullResult, SyncError> {
601 Ok(PullResult {
602 events: vec![],
603 remote_head: self.head.load(Ordering::SeqCst),
604 has_more: false,
605 })
606 }
607 }
608
609 let transport = MockHeadTransport { head: Arc::new(AtomicU64::new(0)) };
610
611 let mut engine = SyncEngine::new(make_config()).unwrap();
612 engine.record(make_event("a")).unwrap();
613 engine.record(make_event("b")).unwrap();
614
615 let result = engine.push(&transport).await.unwrap();
616 assert_eq!(result.remote_head, 2);
617 assert_eq!(engine.state().remote_head, 2);
618 }
619
620 #[tokio::test]
621 async fn transport_error_propagates() {
622 #[derive(Debug)]
624 struct FailTransport;
625
626 #[async_trait::async_trait]
627 impl Transport for FailTransport {
628 async fn push_events(&self, _events: &[SyncEvent]) -> Result<PushResult, SyncError> {
629 Err(SyncError::Transport("network down".into()))
630 }
631 async fn pull_events(
632 &self,
633 _since: u64,
634 _limit: usize,
635 ) -> Result<PullResult, SyncError> {
636 Err(SyncError::Transport("network down".into()))
637 }
638 }
639
640 let mut engine = SyncEngine::new(make_config()).unwrap();
641 engine.record(make_event("a")).unwrap();
642
643 let transport = FailTransport;
644 let result = engine.push(&transport).await;
645 assert!(result.is_err());
646 assert!(matches!(result.unwrap_err(), SyncError::Transport(_)));
647 assert_eq!(engine.pending_count(), 1);
649
650 let pull_result = engine.pull(&transport).await;
651 assert!(pull_result.is_err());
652 }
653
654 #[tokio::test]
655 async fn push_only_drains_accepted_events() {
656 #[derive(Debug)]
658 struct PartialAcceptTransport;
659
660 #[async_trait::async_trait]
661 impl Transport for PartialAcceptTransport {
662 async fn push_events(&self, _events: &[SyncEvent]) -> Result<PushResult, SyncError> {
663 Ok(PushResult { accepted: 1, remote_head: 1 })
664 }
665
666 async fn pull_events(
667 &self,
668 _since: u64,
669 _limit: usize,
670 ) -> Result<PullResult, SyncError> {
671 Ok(PullResult { events: vec![], remote_head: 1, has_more: false })
672 }
673 }
674
675 let mut engine = SyncEngine::new(make_config()).unwrap();
676 engine.record(make_event("a")).unwrap();
677 engine.record(make_event("b")).unwrap();
678 engine.record(make_event("c")).unwrap();
679
680 let result = engine.push(&PartialAcceptTransport).await.unwrap();
681 assert_eq!(result.accepted, 1);
682 assert_eq!(engine.pending_count(), 2);
683 }
684
685 #[tokio::test]
686 async fn push_returns_storage_error_when_ack_persist_fails() {
687 #[derive(Debug)]
688 struct AcceptAllTransport;
689
690 #[async_trait::async_trait]
691 impl Transport for AcceptAllTransport {
692 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
693 Ok(PushResult { accepted: events.len(), remote_head: events.len() as u64 })
694 }
695
696 async fn pull_events(
697 &self,
698 _since: u64,
699 _limit: usize,
700 ) -> Result<PullResult, SyncError> {
701 Ok(PullResult { events: vec![], remote_head: 0, has_more: false })
702 }
703 }
704
705 let dir = tempfile::tempdir().unwrap();
706 let path = dir.path().join("outbox.json");
707 let config = make_config().with_outbox_path(path.to_string_lossy().into_owned());
708 let mut engine = SyncEngine::new(config).unwrap();
709 engine.record(make_event("a")).unwrap();
710
711 std::fs::remove_file(&path).unwrap();
712 std::fs::create_dir(&path).unwrap();
713
714 let err = engine.push(&AcceptAllTransport).await.unwrap_err();
715 assert!(matches!(err, SyncError::Storage(_)));
716 assert_eq!(engine.pending_count(), 1);
717 assert_eq!(engine.state().remote_head, 0);
718 assert!(engine.state().last_push.is_none());
719 }
720
721 #[tokio::test]
722 async fn pull_conflict_resolution() {
723 #[derive(Debug)]
725 struct ConflictTransport;
726
727 #[async_trait::async_trait]
728 impl Transport for ConflictTransport {
729 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
730 Ok(PushResult { accepted: events.len(), remote_head: 10 })
731 }
732 async fn pull_events(
733 &self,
734 _since: u64,
735 _limit: usize,
736 ) -> Result<PullResult, SyncError> {
737 let remote_event =
739 SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
740 .with_sequence(5);
741 Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
742 }
743 }
744
745 let mut engine =
746 SyncEngine::with_strategy(make_config(), ConflictStrategy::RemoteWins).unwrap();
747 engine
748 .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
749 .unwrap();
750
751 let transport = ConflictTransport;
752 let result = engine.pull(&transport).await.unwrap();
753 assert_eq!(result.events.len(), 1);
754 assert_eq!(engine.buffered_count(), 1);
756 assert_eq!(engine.pending_count(), 0);
757 }
758
759 #[tokio::test]
760 async fn pull_conflict_remote_wins_only_drops_latest_pending_event() {
761 #[derive(Debug)]
762 struct ConflictTransport;
763
764 #[async_trait::async_trait]
765 impl Transport for ConflictTransport {
766 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
767 Ok(PushResult { accepted: events.len(), remote_head: 10 })
768 }
769
770 async fn pull_events(
771 &self,
772 _since: u64,
773 _limit: usize,
774 ) -> Result<PullResult, SyncError> {
775 let remote_event =
776 SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
777 .with_sequence(5);
778 Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
779 }
780 }
781
782 let mut engine =
783 SyncEngine::with_strategy(make_config(), ConflictStrategy::RemoteWins).unwrap();
784 engine
785 .record(SyncEvent::new("order.note_added", "order", "ORD-1", json!({"note": "a"})))
786 .unwrap();
787 engine
788 .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
789 .unwrap();
790
791 engine.pull(&ConflictTransport).await.unwrap();
792
793 let pending: Vec<_> = engine.outbox.peek(10).into_iter().cloned().collect();
794 assert_eq!(pending.len(), 1);
795 assert_eq!(pending[0].event_type, "order.note_added");
796 assert_eq!(engine.buffered_count(), 1);
797 }
798
799 #[tokio::test]
800 async fn pull_conflict_local_wins_keeps_pending_and_skips_remote() {
801 #[derive(Debug)]
802 struct ConflictTransport;
803
804 #[async_trait::async_trait]
805 impl Transport for ConflictTransport {
806 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
807 Ok(PushResult { accepted: events.len(), remote_head: 10 })
808 }
809 async fn pull_events(
810 &self,
811 _since: u64,
812 _limit: usize,
813 ) -> Result<PullResult, SyncError> {
814 let remote_event =
815 SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
816 .with_sequence(5);
817 Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
818 }
819 }
820
821 let mut engine =
822 SyncEngine::with_strategy(make_config(), ConflictStrategy::LocalWins).unwrap();
823 engine
824 .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
825 .unwrap();
826
827 let result = engine.pull(&ConflictTransport).await.unwrap();
828 assert_eq!(result.events.len(), 1);
829 assert_eq!(engine.pending_count(), 1);
830 assert_eq!(engine.buffered_count(), 0);
831 }
832
833 #[tokio::test]
834 async fn full_sync_paginates_pull_until_complete() {
835 #[derive(Debug)]
836 struct PagingTransport {
837 pulls: Arc<AtomicU64>,
838 since_args: Arc<Mutex<Vec<u64>>>,
839 }
840
841 #[async_trait::async_trait]
842 impl Transport for PagingTransport {
843 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
844 Ok(PushResult { accepted: events.len(), remote_head: 0 })
845 }
846
847 async fn pull_events(
848 &self,
849 since: u64,
850 _limit: usize,
851 ) -> Result<PullResult, SyncError> {
852 self.since_args.lock().unwrap().push(since);
853 let call = self.pulls.fetch_add(1, Ordering::SeqCst);
854 if call == 0 {
855 Ok(PullResult {
856 events: vec![
857 SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
858 .with_sequence(1),
859 ],
860 remote_head: 999,
862 has_more: true,
863 })
864 } else {
865 Ok(PullResult {
866 events: vec![
867 SyncEvent::new("order.updated", "order", "ORD-2", json!({}))
868 .with_sequence(2),
869 ],
870 remote_head: 999,
871 has_more: false,
872 })
873 }
874 }
875 }
876
877 let since_args = Arc::new(Mutex::new(Vec::new()));
878 let transport = PagingTransport {
879 pulls: Arc::new(AtomicU64::new(0)),
880 since_args: Arc::clone(&since_args),
881 };
882 let mut engine = SyncEngine::new(make_config()).unwrap();
883 let (_push_result, pull_result) = engine.full_sync(&transport).await.unwrap();
884
885 assert_eq!(pull_result.events.len(), 2);
886 assert!(!pull_result.has_more);
887 assert_eq!(engine.buffered_count(), 2);
888 assert_eq!(engine.state().remote_head, 999);
889 assert_eq!(transport.pulls.load(Ordering::SeqCst), 2);
890 assert_eq!(&*since_args.lock().unwrap(), &[0, 1]);
891 }
892
893 #[tokio::test]
894 async fn pull_cursor_does_not_advance_from_push_remote_head() {
895 #[derive(Debug)]
896 struct HeadSkewTransport {
897 since_args: Arc<Mutex<Vec<u64>>>,
898 }
899
900 #[async_trait::async_trait]
901 impl Transport for HeadSkewTransport {
902 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
903 Ok(PushResult { accepted: events.len(), remote_head: 1000 })
904 }
905
906 async fn pull_events(
907 &self,
908 since: u64,
909 _limit: usize,
910 ) -> Result<PullResult, SyncError> {
911 self.since_args.lock().unwrap().push(since);
912 Ok(PullResult {
913 events: vec![
914 SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
915 .with_sequence(7),
916 ],
917 remote_head: 1000,
918 has_more: false,
919 })
920 }
921 }
922
923 let since_args = Arc::new(Mutex::new(Vec::new()));
924 let transport = HeadSkewTransport { since_args: Arc::clone(&since_args) };
925 let mut engine = SyncEngine::new(make_config()).unwrap();
926
927 engine.record(make_event("local.pending")).unwrap();
928 let (_push, pull) = engine.full_sync(&transport).await.unwrap();
929
930 assert_eq!(pull.events.len(), 1);
931 assert_eq!(&*since_args.lock().unwrap(), &[0]);
932 }
933
934 #[tokio::test]
935 async fn pull_errors_when_has_more_but_cursor_cannot_advance() {
936 #[derive(Debug)]
937 struct StalledPagingTransport;
938
939 #[async_trait::async_trait]
940 impl Transport for StalledPagingTransport {
941 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
942 Ok(PushResult { accepted: events.len(), remote_head: 0 })
943 }
944
945 async fn pull_events(
946 &self,
947 _since: u64,
948 _limit: usize,
949 ) -> Result<PullResult, SyncError> {
950 Ok(PullResult {
953 events: vec![
954 SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
955 .with_sequence(0),
956 ],
957 remote_head: 100,
958 has_more: true,
959 })
960 }
961 }
962
963 let mut engine = SyncEngine::new(make_config()).unwrap();
964 let err = engine.pull(&StalledPagingTransport).await.unwrap_err();
965 assert!(matches!(err, SyncError::Transport(_)));
966 }
967
968 proptest! {
969 #[test]
970 fn resolve_next_cursor_enforces_monotonic_progress(
971 since in 0u64..20_000,
972 transport_cursor in prop::option::of(0u64..20_000),
973 sequences in prop::collection::vec(0u64..20_000, 0..64),
974 ) {
975 let events: Vec<SyncEvent> = sequences
976 .iter()
977 .enumerate()
978 .map(|(i, seq)| {
979 SyncEvent::new(
980 format!("evt-{i}"),
981 "entity",
982 format!("id-{i}"),
983 json!({ "s": seq }),
984 )
985 .with_sequence(*seq)
986 })
987 .collect();
988
989 let result = SyncEngine::resolve_next_cursor(since, &events, true, transport_cursor);
990 if let Some(cursor) = transport_cursor {
991 if cursor > since {
992 prop_assert_eq!(result.unwrap(), Some(cursor));
993 } else {
994 prop_assert!(matches!(result, Err(SyncError::Transport(_))));
995 }
996 } else if let Some(expected) = derive_next_cursor(since, &events) {
997 prop_assert_eq!(result.unwrap(), Some(expected));
998 } else {
999 prop_assert!(matches!(result, Err(SyncError::Transport(_))));
1000 }
1001 }
1002 }
1003
1004 proptest! {
1005 #[test]
1006 fn resolve_next_cursor_returns_none_when_transport_signals_no_more(
1007 since in 0u64..20_000,
1008 transport_cursor in prop::option::of(0u64..20_000),
1009 sequences in prop::collection::vec(0u64..20_000, 0..64),
1010 ) {
1011 let events: Vec<SyncEvent> = sequences
1012 .iter()
1013 .enumerate()
1014 .map(|(i, seq)| {
1015 SyncEvent::new(
1016 format!("evt-{i}"),
1017 "entity",
1018 format!("id-{i}"),
1019 json!({}),
1020 )
1021 .with_sequence(*seq)
1022 })
1023 .collect();
1024
1025 let result = SyncEngine::resolve_next_cursor(since, &events, false, transport_cursor)
1026 .unwrap();
1027 prop_assert_eq!(result, None);
1028 }
1029 }
1030
1031 #[test]
1032 fn engine_debug() {
1033 let engine = SyncEngine::new(make_config()).unwrap();
1034 let debug = format!("{engine:?}");
1035 assert!(debug.contains("SyncEngine"));
1036 }
1037}