1use std::collections::HashSet;
2
3use chrono::Utc;
4
5use crate::buffer::EventBuffer;
6use crate::config::SyncConfig;
7use crate::conflict::{ConflictResolver, ConflictStrategy, Resolution};
8use crate::error::SyncError;
9use crate::event::SyncEvent;
10use crate::outbox::Outbox;
11use crate::state::{SyncState, SyncStatus};
12use crate::transport::{PullPage, PullResult, PushResult, Transport, derive_next_cursor};
13
14const MAX_PULL_PAGES: usize = 10_000;
16
17#[derive(Debug)]
41pub struct SyncEngine {
42 config: SyncConfig,
43 state: SyncState,
44 outbox: Outbox,
45 buffer: EventBuffer,
46 resolver: ConflictResolver,
47 last_pulled_sequence: u64,
48 next_pull_cursor: Option<u64>,
49 initialized: bool,
50}
51
52impl SyncEngine {
53 pub fn new(config: SyncConfig) -> Result<Self, SyncError> {
60 Self::try_new(config)
61 }
62
63 pub fn with_strategy(
70 config: SyncConfig,
71 strategy: ConflictStrategy,
72 ) -> Result<Self, SyncError> {
73 Self::try_with_strategy(config, strategy)
74 }
75
76 pub fn try_new(config: SyncConfig) -> Result<Self, SyncError> {
83 Self::try_with_strategy(config, ConflictStrategy::default())
84 }
85
86 pub fn try_with_strategy(
93 config: SyncConfig,
94 strategy: ConflictStrategy,
95 ) -> Result<Self, SyncError> {
96 config.validate()?;
97 let buffer_capacity = config.resolved_buffer_capacity();
98 let outbox = if let Some(path) = config.outbox_path.as_deref() {
99 Outbox::with_persistence(config.resolved_outbox_capacity(), path)?
100 } else {
101 Outbox::new(config.resolved_outbox_capacity())
102 };
103
104 Ok(Self {
105 config,
106 state: SyncState::default(),
107 outbox,
108 buffer: EventBuffer::new(buffer_capacity),
109 resolver: ConflictResolver::new(strategy),
110 last_pulled_sequence: 0,
111 next_pull_cursor: None,
112 initialized: true,
113 })
114 }
115
116 pub fn record(&mut self, event: SyncEvent) -> Result<u64, SyncError> {
124 let seq = self.outbox.append(event)?;
125 self.state.local_head = seq;
126 self.state.pending_count = self.outbox.count();
127 Ok(seq)
128 }
129
130 pub async fn push(&mut self, transport: &dyn Transport) -> Result<PushResult, SyncError> {
138 let batch_size = self.config.resolved_batch_size();
139 let events: Vec<SyncEvent> = self.outbox.peek(batch_size).into_iter().cloned().collect();
140
141 if events.is_empty() {
142 return Ok(PushResult { accepted: 0, remote_head: self.state.remote_head });
143 }
144
145 let result = transport.push_events(&events).await?;
146 let accepted = result.accepted.min(events.len());
147 if accepted > 0 {
148 let _ = self.outbox.drain(accepted);
149 }
150
151 self.state.remote_head = result.remote_head;
152 self.state.last_push = Some(Utc::now());
153 self.state.pending_count = self.outbox.count();
154
155 Ok(result)
156 }
157
158 pub async fn pull(&mut self, transport: &dyn Transport) -> Result<PullResult, SyncError> {
168 let since = self.next_pull_cursor.unwrap_or(self.last_pulled_sequence);
169 let (result, next_cursor) = self.pull_since(transport, since).await?;
170 self.next_pull_cursor = next_cursor;
171 Ok(result)
172 }
173
174 async fn pull_since(
175 &mut self,
176 transport: &dyn Transport,
177 since: u64,
178 ) -> Result<(PullResult, Option<u64>), SyncError> {
179 let limit = self.config.resolved_batch_size();
180 let PullPage { result, next_cursor: transport_next_cursor } =
181 transport.pull_events_page(since, limit).await?;
182
183 let pending: Vec<SyncEvent> =
185 self.outbox.peek(self.outbox.count()).into_iter().cloned().collect();
186 let mut drop_local_ids = HashSet::new();
187 let mut events_to_buffer = Vec::with_capacity(result.events.len());
188
189 for pulled_event in &result.events {
190 let mut keep_remote = true;
191
192 for local_event in &pending {
193 if local_event.entity_type == pulled_event.entity_type
194 && local_event.entity_id == pulled_event.entity_id
195 {
196 match self.resolver.resolve(local_event, pulled_event) {
197 Resolution::KeepLocal => {
198 keep_remote = false;
199 break;
200 }
201 Resolution::KeepRemote => {
202 drop_local_ids.insert(local_event.id);
203 }
204 Resolution::Merge(merged) => {
205 drop_local_ids.insert(local_event.id);
206 events_to_buffer.push(merged);
207 keep_remote = false;
208 break;
209 }
210 }
211 }
212 }
213
214 if keep_remote {
215 events_to_buffer.push(pulled_event.clone());
216 }
217 }
218
219 if !drop_local_ids.is_empty() {
220 self.outbox.retain(|event| !drop_local_ids.contains(&event.id));
221 self.state.pending_count = self.outbox.count();
222 }
223
224 for event in events_to_buffer {
226 self.buffer.push(event);
227 }
228
229 self.state.remote_head = result.remote_head;
230 self.state.last_pull = Some(Utc::now());
231 let observed_cursor = transport_next_cursor
232 .filter(|cursor| *cursor > since)
233 .or_else(|| derive_next_cursor(since, &result.events))
234 .unwrap_or(since);
235 self.last_pulled_sequence = self.last_pulled_sequence.max(observed_cursor);
236
237 let next_cursor = Self::resolve_next_cursor(
238 since,
239 &result.events,
240 result.has_more,
241 transport_next_cursor,
242 )?;
243
244 Ok((result, next_cursor))
245 }
246
247 fn resolve_next_cursor(
248 since: u64,
249 events: &[SyncEvent],
250 has_more: bool,
251 transport_next_cursor: Option<u64>,
252 ) -> Result<Option<u64>, SyncError> {
253 if !has_more {
254 return Ok(None);
255 }
256
257 let next_cursor = transport_next_cursor.or_else(|| derive_next_cursor(since, events));
258 let Some(next_cursor) = next_cursor else {
259 return Err(SyncError::Transport(
260 "pull pagination stalled: has_more=true but no advancing cursor available"
261 .to_string(),
262 ));
263 };
264 if next_cursor <= since {
265 return Err(SyncError::Transport(format!(
266 "pull pagination cursor did not advance (since={since}, next_cursor={next_cursor})"
267 )));
268 }
269 Ok(Some(next_cursor))
270 }
271
272 #[must_use]
274 pub fn status(&self) -> SyncStatus {
275 SyncStatus {
276 initialized: self.initialized,
277 local_head: self.state.local_head,
278 remote_head: self.state.remote_head,
279 pending: self.outbox.count(),
280 lag: self.state.lag(),
281 last_push: self.state.last_push,
282 last_pull: self.state.last_pull,
283 buffered_events: self.buffer.len(),
284 }
285 }
286
287 #[must_use]
289 pub fn pending_count(&self) -> usize {
290 self.outbox.count()
291 }
292
293 #[must_use]
295 pub fn buffered_count(&self) -> usize {
296 self.buffer.len()
297 }
298
299 pub fn drain_buffer(&mut self) -> Vec<SyncEvent> {
301 self.buffer.drain_all()
302 }
303
304 #[must_use]
306 pub const fn state(&self) -> &SyncState {
307 &self.state
308 }
309
310 #[must_use]
312 pub const fn config(&self) -> &SyncConfig {
313 &self.config
314 }
315
316 #[must_use]
318 pub const fn resolver(&self) -> &ConflictResolver {
319 &self.resolver
320 }
321
322 pub async fn full_sync(
328 &mut self,
329 transport: &dyn Transport,
330 ) -> Result<(PushResult, PullResult), SyncError> {
331 let push_result = self.push(transport).await?;
332 let mut since = self.next_pull_cursor.unwrap_or(self.last_pulled_sequence);
333 let (mut pull_result, next_cursor) = self.pull_since(transport, since).await?;
334 self.next_pull_cursor = next_cursor;
335 let mut pull_pages = 1;
336
337 while pull_result.has_more {
338 if pull_pages >= MAX_PULL_PAGES {
339 return Err(SyncError::Transport(
340 "pull pagination exceeded safety limit".to_string(),
341 ));
342 }
343
344 since = self.next_pull_cursor.ok_or_else(|| {
345 SyncError::Transport(
346 "pull pagination stalled: has_more=true but no continuation cursor".to_string(),
347 )
348 })?;
349
350 let (next_page, page_cursor) = self.pull_since(transport, since).await?;
351 self.next_pull_cursor = page_cursor;
352
353 pull_result.events.extend(next_page.events);
354 pull_result.remote_head = next_page.remote_head;
355 pull_result.has_more = next_page.has_more;
356 pull_pages += 1;
357 }
358
359 self.next_pull_cursor = None;
360 Ok((push_result, pull_result))
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367 use crate::transport::NullTransport;
368 use proptest::prelude::*;
369 use serde_json::json;
370 use std::sync::Arc;
371 use std::sync::Mutex;
372 use std::sync::atomic::{AtomicU64, Ordering};
373 use tempfile::tempdir;
374
375 fn make_config() -> SyncConfig {
376 SyncConfig::new("agent-1", "tenant-1", "store-1")
377 }
378
379 fn make_event(event_type: &str) -> SyncEvent {
380 SyncEvent::new(event_type, "order", "ORD-1", json!({}))
381 }
382
383 #[test]
384 fn new_engine() {
385 let engine = SyncEngine::new(make_config()).unwrap();
386 assert_eq!(engine.pending_count(), 0);
387 assert_eq!(engine.buffered_count(), 0);
388 assert!(engine.status().initialized);
389 }
390
391 #[test]
392 fn record_event() {
393 let mut engine = SyncEngine::new(make_config()).unwrap();
394 let seq = engine.record(make_event("order.created")).unwrap();
395 assert_eq!(seq, 1);
396 assert_eq!(engine.pending_count(), 1);
397 assert_eq!(engine.state().local_head, 1);
398 }
399
400 #[test]
401 fn record_multiple_events() {
402 let mut engine = SyncEngine::new(make_config()).unwrap();
403 engine.record(make_event("a")).unwrap();
404 engine.record(make_event("b")).unwrap();
405 engine.record(make_event("c")).unwrap();
406 assert_eq!(engine.pending_count(), 3);
407 assert_eq!(engine.state().local_head, 3);
408 }
409
410 #[tokio::test]
411 async fn push_with_null_transport() {
412 let mut engine = SyncEngine::new(make_config()).unwrap();
413 engine.record(make_event("a")).unwrap();
414 engine.record(make_event("b")).unwrap();
415
416 let transport = NullTransport::new();
417 let result = engine.push(&transport).await.unwrap();
418 assert_eq!(result.accepted, 2);
419 assert_eq!(engine.pending_count(), 0);
420 assert!(engine.state().last_push.is_some());
421 }
422
423 #[tokio::test]
424 async fn push_empty_outbox() {
425 let mut engine = SyncEngine::new(make_config()).unwrap();
426 let transport = NullTransport::new();
427 let result = engine.push(&transport).await.unwrap();
428 assert_eq!(result.accepted, 0);
429 }
430
431 #[tokio::test]
432 async fn pull_with_null_transport() {
433 let mut engine = SyncEngine::new(make_config()).unwrap();
434 let transport = NullTransport::new();
435 let result = engine.pull(&transport).await.unwrap();
436 assert!(result.events.is_empty());
437 assert!(!result.has_more);
438 assert!(engine.state().last_pull.is_some());
439 }
440
441 #[tokio::test]
442 async fn pull_buffers_events() {
443 #[derive(Debug)]
445 struct MockPullTransport {
446 events: Vec<SyncEvent>,
447 head: u64,
448 }
449
450 #[async_trait::async_trait]
451 impl Transport for MockPullTransport {
452 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
453 Ok(PushResult { accepted: events.len(), remote_head: self.head })
454 }
455 async fn pull_events(
456 &self,
457 _since: u64,
458 _limit: usize,
459 ) -> Result<PullResult, SyncError> {
460 Ok(PullResult {
461 events: self.events.clone(),
462 remote_head: self.head,
463 has_more: false,
464 })
465 }
466 }
467
468 let transport = MockPullTransport {
469 events: vec![
470 make_event("pulled-1").with_sequence(1),
471 make_event("pulled-2").with_sequence(2),
472 ],
473 head: 2,
474 };
475
476 let mut engine = SyncEngine::new(make_config()).unwrap();
477 let result = engine.pull(&transport).await.unwrap();
478 assert_eq!(result.events.len(), 2);
479 assert_eq!(engine.buffered_count(), 2);
480 assert_eq!(engine.state().remote_head, 2);
481 }
482
483 #[tokio::test]
484 async fn full_sync() {
485 let mut engine = SyncEngine::new(make_config()).unwrap();
486 engine.record(make_event("local")).unwrap();
487
488 let transport = NullTransport::new();
489 let (push_result, pull_result) = engine.full_sync(&transport).await.unwrap();
490 assert_eq!(push_result.accepted, 1);
491 assert!(pull_result.events.is_empty());
492 assert_eq!(engine.pending_count(), 0);
493 }
494
495 #[test]
496 fn status_reporting() {
497 let mut engine = SyncEngine::new(make_config()).unwrap();
498 engine.record(make_event("a")).unwrap();
499 engine.record(make_event("b")).unwrap();
500
501 let status = engine.status();
502 assert!(status.initialized);
503 assert_eq!(status.pending, 2);
504 assert_eq!(status.local_head, 2);
505 assert_eq!(status.remote_head, 0);
506 assert_eq!(status.lag, 0);
507 assert!(status.last_push.is_none());
508 }
509
510 #[test]
511 fn drain_buffer() {
512 let mut engine = SyncEngine::new(make_config()).unwrap();
513 engine.buffer.push(make_event("buffered"));
515 assert_eq!(engine.buffered_count(), 1);
516
517 let drained = engine.drain_buffer();
518 assert_eq!(drained.len(), 1);
519 assert_eq!(engine.buffered_count(), 0);
520 }
521
522 #[test]
523 fn engine_with_strategy() {
524 let engine = SyncEngine::with_strategy(make_config(), ConflictStrategy::LocalWins).unwrap();
525 assert_eq!(engine.resolver().strategy(), ConflictStrategy::LocalWins);
526 }
527
528 #[test]
529 fn config_accessor() {
530 let config = make_config();
531 let engine = SyncEngine::new(config).unwrap();
532 assert_eq!(engine.config().agent_id, "agent-1");
533 }
534
535 #[test]
536 fn try_new_rejects_invalid_config() {
537 let bad = SyncConfig::new("", "tenant", "store");
538 assert!(SyncEngine::try_new(bad).is_err());
539 }
540
541 #[test]
542 fn try_new_with_persistent_outbox_restores_pending_events() {
543 let dir = tempdir().unwrap();
544 let outbox_path = dir.path().join("sync-outbox.json");
545 let path_str = outbox_path.to_string_lossy().to_string();
546
547 {
548 let mut engine = SyncEngine::try_new(
549 SyncConfig::new("agent-1", "tenant-1", "store-1")
550 .with_outbox_path(path_str.clone()),
551 )
552 .unwrap();
553 engine.record(make_event("persisted-a")).unwrap();
554 engine.record(make_event("persisted-b")).unwrap();
555 assert_eq!(engine.pending_count(), 2);
556 }
557
558 let engine = SyncEngine::try_new(
559 SyncConfig::new("agent-1", "tenant-1", "store-1").with_outbox_path(path_str),
560 )
561 .unwrap();
562 assert_eq!(engine.pending_count(), 2);
563 }
564
565 #[tokio::test]
566 async fn push_respects_batch_size() {
567 let config = SyncConfig::new("agent-1", "tenant-1", "store-1").with_batch_size(2);
568 let mut engine = SyncEngine::new(config).unwrap();
569 engine.record(make_event("a")).unwrap();
570 engine.record(make_event("b")).unwrap();
571 engine.record(make_event("c")).unwrap();
572
573 let transport = NullTransport::new();
574 let result = engine.push(&transport).await.unwrap();
575 assert_eq!(result.accepted, 2);
577 assert_eq!(engine.pending_count(), 1);
578 }
579
580 #[tokio::test]
581 async fn push_updates_state() {
582 #[derive(Debug)]
584 struct MockHeadTransport {
585 head: Arc<AtomicU64>,
586 }
587
588 #[async_trait::async_trait]
589 impl Transport for MockHeadTransport {
590 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
591 let new_head = self.head.fetch_add(events.len() as u64, Ordering::SeqCst)
592 + events.len() as u64;
593 Ok(PushResult { accepted: events.len(), remote_head: new_head })
594 }
595 async fn pull_events(
596 &self,
597 _since: u64,
598 _limit: usize,
599 ) -> Result<PullResult, SyncError> {
600 Ok(PullResult {
601 events: vec![],
602 remote_head: self.head.load(Ordering::SeqCst),
603 has_more: false,
604 })
605 }
606 }
607
608 let transport = MockHeadTransport { head: Arc::new(AtomicU64::new(0)) };
609
610 let mut engine = SyncEngine::new(make_config()).unwrap();
611 engine.record(make_event("a")).unwrap();
612 engine.record(make_event("b")).unwrap();
613
614 let result = engine.push(&transport).await.unwrap();
615 assert_eq!(result.remote_head, 2);
616 assert_eq!(engine.state().remote_head, 2);
617 }
618
619 #[tokio::test]
620 async fn transport_error_propagates() {
621 #[derive(Debug)]
623 struct FailTransport;
624
625 #[async_trait::async_trait]
626 impl Transport for FailTransport {
627 async fn push_events(&self, _events: &[SyncEvent]) -> Result<PushResult, SyncError> {
628 Err(SyncError::Transport("network down".into()))
629 }
630 async fn pull_events(
631 &self,
632 _since: u64,
633 _limit: usize,
634 ) -> Result<PullResult, SyncError> {
635 Err(SyncError::Transport("network down".into()))
636 }
637 }
638
639 let mut engine = SyncEngine::new(make_config()).unwrap();
640 engine.record(make_event("a")).unwrap();
641
642 let transport = FailTransport;
643 let result = engine.push(&transport).await;
644 assert!(result.is_err());
645 assert!(matches!(result.unwrap_err(), SyncError::Transport(_)));
646 assert_eq!(engine.pending_count(), 1);
648
649 let pull_result = engine.pull(&transport).await;
650 assert!(pull_result.is_err());
651 }
652
653 #[tokio::test]
654 async fn push_only_drains_accepted_events() {
655 #[derive(Debug)]
657 struct PartialAcceptTransport;
658
659 #[async_trait::async_trait]
660 impl Transport for PartialAcceptTransport {
661 async fn push_events(&self, _events: &[SyncEvent]) -> Result<PushResult, SyncError> {
662 Ok(PushResult { accepted: 1, remote_head: 1 })
663 }
664
665 async fn pull_events(
666 &self,
667 _since: u64,
668 _limit: usize,
669 ) -> Result<PullResult, SyncError> {
670 Ok(PullResult { events: vec![], remote_head: 1, has_more: false })
671 }
672 }
673
674 let mut engine = SyncEngine::new(make_config()).unwrap();
675 engine.record(make_event("a")).unwrap();
676 engine.record(make_event("b")).unwrap();
677 engine.record(make_event("c")).unwrap();
678
679 let result = engine.push(&PartialAcceptTransport).await.unwrap();
680 assert_eq!(result.accepted, 1);
681 assert_eq!(engine.pending_count(), 2);
682 }
683
684 #[tokio::test]
685 async fn pull_conflict_resolution() {
686 #[derive(Debug)]
688 struct ConflictTransport;
689
690 #[async_trait::async_trait]
691 impl Transport for ConflictTransport {
692 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
693 Ok(PushResult { accepted: events.len(), remote_head: 10 })
694 }
695 async fn pull_events(
696 &self,
697 _since: u64,
698 _limit: usize,
699 ) -> Result<PullResult, SyncError> {
700 let remote_event =
702 SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
703 .with_sequence(5);
704 Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
705 }
706 }
707
708 let mut engine =
709 SyncEngine::with_strategy(make_config(), ConflictStrategy::RemoteWins).unwrap();
710 engine
711 .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
712 .unwrap();
713
714 let transport = ConflictTransport;
715 let result = engine.pull(&transport).await.unwrap();
716 assert_eq!(result.events.len(), 1);
717 assert_eq!(engine.buffered_count(), 1);
719 assert_eq!(engine.pending_count(), 0);
720 }
721
722 #[tokio::test]
723 async fn pull_conflict_local_wins_keeps_pending_and_skips_remote() {
724 #[derive(Debug)]
725 struct ConflictTransport;
726
727 #[async_trait::async_trait]
728 impl Transport for ConflictTransport {
729 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
730 Ok(PushResult { accepted: events.len(), remote_head: 10 })
731 }
732 async fn pull_events(
733 &self,
734 _since: u64,
735 _limit: usize,
736 ) -> Result<PullResult, SyncError> {
737 let remote_event =
738 SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
739 .with_sequence(5);
740 Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
741 }
742 }
743
744 let mut engine =
745 SyncEngine::with_strategy(make_config(), ConflictStrategy::LocalWins).unwrap();
746 engine
747 .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
748 .unwrap();
749
750 let result = engine.pull(&ConflictTransport).await.unwrap();
751 assert_eq!(result.events.len(), 1);
752 assert_eq!(engine.pending_count(), 1);
753 assert_eq!(engine.buffered_count(), 0);
754 }
755
756 #[tokio::test]
757 async fn full_sync_paginates_pull_until_complete() {
758 #[derive(Debug)]
759 struct PagingTransport {
760 pulls: Arc<AtomicU64>,
761 since_args: Arc<Mutex<Vec<u64>>>,
762 }
763
764 #[async_trait::async_trait]
765 impl Transport for PagingTransport {
766 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
767 Ok(PushResult { accepted: events.len(), remote_head: 0 })
768 }
769
770 async fn pull_events(
771 &self,
772 since: u64,
773 _limit: usize,
774 ) -> Result<PullResult, SyncError> {
775 self.since_args.lock().unwrap().push(since);
776 let call = self.pulls.fetch_add(1, Ordering::SeqCst);
777 if call == 0 {
778 Ok(PullResult {
779 events: vec![
780 SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
781 .with_sequence(1),
782 ],
783 remote_head: 999,
785 has_more: true,
786 })
787 } else {
788 Ok(PullResult {
789 events: vec![
790 SyncEvent::new("order.updated", "order", "ORD-2", json!({}))
791 .with_sequence(2),
792 ],
793 remote_head: 999,
794 has_more: false,
795 })
796 }
797 }
798 }
799
800 let since_args = Arc::new(Mutex::new(Vec::new()));
801 let transport = PagingTransport {
802 pulls: Arc::new(AtomicU64::new(0)),
803 since_args: Arc::clone(&since_args),
804 };
805 let mut engine = SyncEngine::new(make_config()).unwrap();
806 let (_push_result, pull_result) = engine.full_sync(&transport).await.unwrap();
807
808 assert_eq!(pull_result.events.len(), 2);
809 assert!(!pull_result.has_more);
810 assert_eq!(engine.buffered_count(), 2);
811 assert_eq!(engine.state().remote_head, 999);
812 assert_eq!(transport.pulls.load(Ordering::SeqCst), 2);
813 assert_eq!(&*since_args.lock().unwrap(), &[0, 1]);
814 }
815
816 #[tokio::test]
817 async fn pull_cursor_does_not_advance_from_push_remote_head() {
818 #[derive(Debug)]
819 struct HeadSkewTransport {
820 since_args: Arc<Mutex<Vec<u64>>>,
821 }
822
823 #[async_trait::async_trait]
824 impl Transport for HeadSkewTransport {
825 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
826 Ok(PushResult { accepted: events.len(), remote_head: 1000 })
827 }
828
829 async fn pull_events(
830 &self,
831 since: u64,
832 _limit: usize,
833 ) -> Result<PullResult, SyncError> {
834 self.since_args.lock().unwrap().push(since);
835 Ok(PullResult {
836 events: vec![
837 SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
838 .with_sequence(7),
839 ],
840 remote_head: 1000,
841 has_more: false,
842 })
843 }
844 }
845
846 let since_args = Arc::new(Mutex::new(Vec::new()));
847 let transport = HeadSkewTransport { since_args: Arc::clone(&since_args) };
848 let mut engine = SyncEngine::new(make_config()).unwrap();
849
850 engine.record(make_event("local.pending")).unwrap();
851 let (_push, pull) = engine.full_sync(&transport).await.unwrap();
852
853 assert_eq!(pull.events.len(), 1);
854 assert_eq!(&*since_args.lock().unwrap(), &[0]);
855 }
856
857 #[tokio::test]
858 async fn pull_errors_when_has_more_but_cursor_cannot_advance() {
859 #[derive(Debug)]
860 struct StalledPagingTransport;
861
862 #[async_trait::async_trait]
863 impl Transport for StalledPagingTransport {
864 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
865 Ok(PushResult { accepted: events.len(), remote_head: 0 })
866 }
867
868 async fn pull_events(
869 &self,
870 _since: u64,
871 _limit: usize,
872 ) -> Result<PullResult, SyncError> {
873 Ok(PullResult {
876 events: vec![
877 SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
878 .with_sequence(0),
879 ],
880 remote_head: 100,
881 has_more: true,
882 })
883 }
884 }
885
886 let mut engine = SyncEngine::new(make_config()).unwrap();
887 let err = engine.pull(&StalledPagingTransport).await.unwrap_err();
888 assert!(matches!(err, SyncError::Transport(_)));
889 }
890
891 proptest! {
892 #[test]
893 fn resolve_next_cursor_enforces_monotonic_progress(
894 since in 0u64..20_000,
895 transport_cursor in prop::option::of(0u64..20_000),
896 sequences in prop::collection::vec(0u64..20_000, 0..64),
897 ) {
898 let events: Vec<SyncEvent> = sequences
899 .iter()
900 .enumerate()
901 .map(|(i, seq)| {
902 SyncEvent::new(
903 format!("evt-{i}"),
904 "entity",
905 format!("id-{i}"),
906 json!({ "s": seq }),
907 )
908 .with_sequence(*seq)
909 })
910 .collect();
911
912 let result = SyncEngine::resolve_next_cursor(since, &events, true, transport_cursor);
913 if let Some(cursor) = transport_cursor {
914 if cursor > since {
915 prop_assert_eq!(result.unwrap(), Some(cursor));
916 } else {
917 prop_assert!(matches!(result, Err(SyncError::Transport(_))));
918 }
919 } else if let Some(expected) = derive_next_cursor(since, &events) {
920 prop_assert_eq!(result.unwrap(), Some(expected));
921 } else {
922 prop_assert!(matches!(result, Err(SyncError::Transport(_))));
923 }
924 }
925 }
926
927 proptest! {
928 #[test]
929 fn resolve_next_cursor_returns_none_when_transport_signals_no_more(
930 since in 0u64..20_000,
931 transport_cursor in prop::option::of(0u64..20_000),
932 sequences in prop::collection::vec(0u64..20_000, 0..64),
933 ) {
934 let events: Vec<SyncEvent> = sequences
935 .iter()
936 .enumerate()
937 .map(|(i, seq)| {
938 SyncEvent::new(
939 format!("evt-{i}"),
940 "entity",
941 format!("id-{i}"),
942 json!({}),
943 )
944 .with_sequence(*seq)
945 })
946 .collect();
947
948 let result = SyncEngine::resolve_next_cursor(since, &events, false, transport_cursor)
949 .unwrap();
950 prop_assert_eq!(result, None);
951 }
952 }
953
954 #[test]
955 fn engine_debug() {
956 let engine = SyncEngine::new(make_config()).unwrap();
957 let debug = format!("{engine:?}");
958 assert!(debug.contains("SyncEngine"));
959 }
960}