1use std::{collections::HashMap, time::Duration};
4
5use chrono::{DateTime, Utc};
6use force::{auth::Authenticator, client::ForceClient};
7use serde_json::Value;
8
9use crate::{
10 ApplyLane, ObjectSync, PlannerContext,
11 apply::{
12 postgres::project_sync_link,
13 salesforce::{ApplyError, SalesforceApplier},
14 },
15 capture,
16 error::ForceSyncError,
17 identity::SyncKey,
18 model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
19 plan_change, reconcile,
20 store::pg::{LeasedTask, PgStore, SyncConflict},
21};
22
23struct ApplyTaskContext {
24 _journal_id: i64,
25 envelope: ChangeEnvelope,
26 current_payload: Option<Value>,
27}
28
29#[derive(Debug, Clone)]
31pub struct SyncEngine<A: Authenticator> {
32 store: PgStore,
33 salesforce: SalesforceApplier<A>,
34 objects: HashMap<String, ObjectSync>,
35 capture_batch_size: i64,
36 capture_priority: i32,
37 apply_batch_size: i64,
38 reconcile_batch_size: i64,
39 lease_for: Duration,
40 worker_id: String,
41}
42
43#[derive(Debug)]
45pub struct SyncEngineBuilder<A: Authenticator> {
46 salesforce_client: ForceClient<A>,
47 postgres: Option<PgStore>,
48 objects: Vec<ObjectSync>,
49 capture_batch_size: i64,
50 capture_priority: i32,
51 apply_batch_size: i64,
52 reconcile_batch_size: i64,
53 lease_for: Duration,
54 worker_id: String,
55}
56
57impl<A: Authenticator> SyncEngine<A> {
58 #[must_use]
60 pub fn builder(salesforce_client: ForceClient<A>) -> SyncEngineBuilder<A> {
61 SyncEngineBuilder::new(salesforce_client)
62 }
63
64 pub async fn run_capture_postgres_once(&self) -> Result<usize, ForceSyncError> {
70 capture::postgres::capture_batch(
71 &self.store,
72 self.capture_batch_size,
73 self.capture_priority,
74 )
75 .await
76 }
77
78 pub async fn run_apply_once(&self) -> Result<usize, ForceSyncError> {
84 let leased = self
85 .store
86 .lease_ready_tasks(&self.worker_id, self.apply_batch_size, self.lease_for)
87 .await?;
88 let batch_size = leased.len().max(1);
89 let mut applied = 0usize;
90
91 for task in leased {
92 if self.process_leased_task(&task, batch_size).await? {
93 applied += 1;
94 }
95 }
96
97 Ok(applied)
98 }
99
100 pub async fn run_reconcile_once(&self) -> Result<usize, ForceSyncError> {
106 reconcile::run_reconcile_once(&self.store, self.reconcile_batch_size.max(1)).await
107 }
108
109 async fn process_leased_task(
110 &self,
111 task: &LeasedTask,
112 batch_size: usize,
113 ) -> Result<bool, ForceSyncError> {
114 let context = self.load_apply_task_context(task.task_id).await?;
115 let Some(context) = context else {
116 let _ = self
117 .store
118 .fail_task_for_worker(
119 &self.worker_id,
120 task.task_id,
121 "missing apply task journal row",
122 )
123 .await?;
124 return Ok(false);
125 };
126
127 let object = self
128 .objects
129 .get(context.envelope.sync_key().object_name())
130 .ok_or(ForceSyncError::MissingConfiguration {
131 field: "object sync",
132 })?;
133
134 let existing_link = self
135 .store
136 .get_link(
137 context.envelope.sync_key().tenant(),
138 context.envelope.sync_key().object_name(),
139 context.envelope.sync_key().external_id(),
140 )
141 .await?;
142
143 let decision = plan_change(
144 &PlannerContext {
145 object: object.clone(),
146 current_payload: context.current_payload.clone(),
147 batch_size,
148 urgent: batch_size <= object.lane_thresholds().rest_max_batch_size(),
149 has_dependencies: false,
150 },
151 &context.envelope,
152 );
153
154 self.apply_planned_task(
155 task,
156 &context.envelope,
157 existing_link.as_ref(),
158 object,
159 decision,
160 )
161 .await
162 }
163
164 async fn apply_planned_task(
165 &self,
166 task: &LeasedTask,
167 envelope: &ChangeEnvelope,
168 existing_link: Option<&crate::store::pg::SyncLink>,
169 object: &ObjectSync,
170 decision: crate::plan::PlanDecision,
171 ) -> Result<bool, ForceSyncError> {
172 if decision.lane == ApplyLane::Conflict {
173 for field_name in &decision.conflicts {
174 let conflict = SyncConflict {
175 tenant: envelope.sync_key().tenant().to_owned(),
176 object_name: envelope.sync_key().object_name().to_owned(),
177 external_id: envelope.sync_key().external_id().to_owned(),
178 field_name: field_name.clone(),
179 left_value: Value::Null,
180 right_value: Value::Null,
181 resolution: None,
182 };
183 self.store.insert_conflict(&conflict).await?;
184 }
185 let _ = self
186 .store
187 .fail_task_for_worker(
188 &self.worker_id,
189 task.task_id,
190 format!("planner conflict: {}", decision.conflicts.join(",")),
191 )
192 .await?;
193 return Ok(false);
194 }
195
196 if decision.lane == ApplyLane::Noop {
197 self.store
198 .ack_task_for_worker(&self.worker_id, task.task_id)
199 .await?;
200 return Ok(false);
201 }
202
203 if should_project_locally(envelope.source(), decision.lane) {
204 return self.apply_local_task(task, envelope, existing_link).await;
205 }
206
207 match decision.lane {
208 ApplyLane::Rest => {
209 let payload = decision
210 .payload
211 .as_ref()
212 .unwrap_or_else(|| envelope.payload());
213 let success = self
214 .apply_rest_task(task, envelope, payload, existing_link, object)
215 .await?;
216 Ok(success)
217 }
218 ApplyLane::Bulk => {
219 let payload = decision
220 .payload
221 .as_ref()
222 .unwrap_or_else(|| envelope.payload());
223 let success = self
224 .apply_bulk_task(task, envelope, payload, existing_link, object)
225 .await?;
226 Ok(success)
227 }
228 ApplyLane::CompositeGraph => {
229 let _ = self
230 .store
231 .fail_task_for_worker(
232 &self.worker_id,
233 task.task_id,
234 format!("unsupported runtime lane: {:?}", decision.lane),
235 )
236 .await?;
237 Ok(false)
238 }
239 ApplyLane::Conflict | ApplyLane::Noop => unreachable!("handled above"),
240 }
241 }
242
243 async fn apply_local_task(
244 &self,
245 task: &LeasedTask,
246 envelope: &ChangeEnvelope,
247 existing_link: Option<&crate::store::pg::SyncLink>,
248 ) -> Result<bool, ForceSyncError> {
249 let salesforce_id = local_projection_salesforce_id(existing_link, envelope)?;
250 let link = project_sync_link(
251 existing_link,
252 envelope,
253 salesforce_id.as_ref(),
254 matches!(envelope.operation(), ChangeOperation::Delete),
255 );
256 self.store.put_link(&link).await?;
257 self.store
258 .ack_task_for_worker(&self.worker_id, task.task_id)
259 .await?;
260 Ok(true)
261 }
262
263 async fn apply_bulk_task(
264 &self,
265 task: &LeasedTask,
266 envelope: &ChangeEnvelope,
267 payload: &Value,
268 existing_link: Option<&crate::store::pg::SyncLink>,
269 object: &ObjectSync,
270 ) -> Result<bool, ForceSyncError> {
271 match envelope.operation() {
272 ChangeOperation::Upsert => {
273 let Some(existing_salesforce_id) =
274 existing_link.and_then(|link| link.salesforce_id.as_deref())
275 else {
276 return self
277 .apply_rest_task(task, envelope, payload, existing_link, object)
278 .await;
279 };
280
281 let job_result = self
282 .salesforce
283 .apply_bulk_upsert(
284 envelope.sync_key().object_name(),
285 object
286 .external_id_field()
287 .ok_or(ForceSyncError::MissingConfiguration {
288 field: "external_id_field",
289 })?,
290 1,
291 vec![payload.clone()],
292 )
293 .await;
294 if let Err(error) = job_result {
295 return self.handle_apply_error(task, error).await;
296 }
297
298 let Some(salesforce_id) =
299 force::types::SalesforceId::new(existing_salesforce_id.to_owned()).ok()
300 else {
301 return self
302 .apply_rest_task(task, envelope, payload, existing_link, object)
303 .await;
304 };
305
306 let link = project_sync_link(existing_link, envelope, Some(&salesforce_id), false);
307 self.store.put_link(&link).await?;
308 self.store
309 .ack_task_for_worker(&self.worker_id, task.task_id)
310 .await?;
311 Ok(true)
312 }
313 ChangeOperation::Delete => {
314 self.apply_rest_task(task, envelope, payload, existing_link, object)
315 .await
316 }
317 }
318 }
319
320 async fn apply_rest_task(
321 &self,
322 task: &LeasedTask,
323 envelope: &ChangeEnvelope,
324 payload: &Value,
325 existing_link: Option<&crate::store::pg::SyncLink>,
326 object: &ObjectSync,
327 ) -> Result<bool, ForceSyncError> {
328 match envelope.operation() {
329 ChangeOperation::Upsert => {
330 let result = self
331 .salesforce
332 .apply_rest_upsert(
333 envelope.sync_key().object_name(),
334 object
335 .external_id_field()
336 .ok_or(ForceSyncError::MissingConfiguration {
337 field: "external_id_field",
338 })?,
339 envelope.sync_key().external_id(),
340 payload,
341 )
342 .await;
343
344 match result {
345 Ok(result) => {
346 let link = project_sync_link(
347 existing_link,
348 envelope,
349 result.salesforce_id.as_ref(),
350 false,
351 );
352 self.store.put_link(&link).await?;
353 self.store
354 .ack_task_for_worker(&self.worker_id, task.task_id)
355 .await?;
356 Ok(true)
357 }
358 Err(error) => self.handle_apply_error(task, error).await,
359 }
360 }
361 ChangeOperation::Delete => {
362 let Some(existing_link) = existing_link else {
363 let _ = self
364 .store
365 .fail_task_for_worker(
366 &self.worker_id,
367 task.task_id,
368 "missing Salesforce ID for delete",
369 )
370 .await?;
371 return Ok(false);
372 };
373 let Some(salesforce_id) = existing_link.salesforce_id.as_deref() else {
374 let _ = self
375 .store
376 .fail_task_for_worker(
377 &self.worker_id,
378 task.task_id,
379 "missing Salesforce ID for delete",
380 )
381 .await?;
382 return Ok(false);
383 };
384 let salesforce_id = force::types::SalesforceId::new(salesforce_id.to_owned())
385 .map_err(|error| ForceSyncError::InvalidStoredValue {
386 field: "salesforce_id",
387 value: error.to_string(),
388 })?;
389
390 match self
391 .salesforce
392 .apply_rest_delete(envelope.sync_key().object_name(), &salesforce_id)
393 .await
394 {
395 Ok(()) => {
396 let link = project_sync_link(
397 existing_link.into(),
398 envelope,
399 Some(&salesforce_id),
400 true,
401 );
402 self.store.put_link(&link).await?;
403 self.store
404 .ack_task_for_worker(&self.worker_id, task.task_id)
405 .await?;
406 Ok(true)
407 }
408 Err(error) => self.handle_apply_error(task, error).await,
409 }
410 }
411 }
412 }
413
414 async fn handle_apply_error(
415 &self,
416 task: &LeasedTask,
417 error: ApplyError,
418 ) -> Result<bool, ForceSyncError> {
419 let error_message = error.to_string();
420 match error {
421 ApplyError::Retryable(_) => {
422 self.store
423 .retry_task_for_worker(
424 &self.worker_id,
425 task.task_id,
426 Utc::now() + chrono::Duration::seconds(30),
427 error_message,
428 )
429 .await?;
430 }
431 ApplyError::Permanent(_) => {
432 self.store
433 .fail_task_for_worker(&self.worker_id, task.task_id, error_message)
434 .await?;
435 }
436 }
437
438 Ok(false)
439 }
440
441 async fn load_apply_task_context(
442 &self,
443 task_id: i64,
444 ) -> Result<Option<ApplyTaskContext>, ForceSyncError> {
445 let client = self.store.pool().get().await?;
446 let row = client
447 .query_opt(
448 "select
449 j.journal_id,
450 j.tenant,
451 j.object_name,
452 j.external_id,
453 j.source,
454 j.source_cursor,
455 j.observed_at,
456 j.operation,
457 j.payload::text as payload_json,
458 applied.payload::text as current_payload_json
459 from sync_task t
460 join sync_journal j
461 on (t.payload->>'journal_id')::bigint = j.journal_id
462 left join sync_link link
463 on link.tenant = j.tenant
464 and link.object_name = j.object_name
465 and link.external_id = j.external_id
466 left join lateral (
467 select payload
468 from sync_journal applied
469 where applied.tenant = j.tenant
470 and applied.object_name = j.object_name
471 and applied.external_id = j.external_id
472 and link.last_payload_hash is not null
473 and applied.payload_hash = link.last_payload_hash
474 order by applied.journal_id desc
475 limit 1
476 ) applied on true
477 where t.task_id = $1",
478 &[&task_id],
479 )
480 .await?;
481
482 row.as_ref().map(build_apply_task_context).transpose()
483 }
484}
485
486impl<A: Authenticator> SyncEngineBuilder<A> {
487 fn new(salesforce_client: ForceClient<A>) -> Self {
488 Self {
489 salesforce_client,
490 postgres: None,
491 objects: Vec::new(),
492 capture_batch_size: 100,
493 capture_priority: 0,
494 apply_batch_size: 1,
495 reconcile_batch_size: 100,
496 lease_for: Duration::from_secs(30),
497 worker_id: "force-sync-apply".to_owned(),
498 }
499 }
500
501 #[must_use]
503 pub fn postgres(mut self, postgres: PgStore) -> Self {
504 self.postgres = Some(postgres);
505 self
506 }
507
508 #[must_use]
510 pub fn object(mut self, object: ObjectSync) -> Self {
511 self.objects.push(object);
512 self
513 }
514
515 #[must_use]
517 pub const fn reconcile_batch_size(mut self, reconcile_batch_size: i64) -> Self {
518 self.reconcile_batch_size = reconcile_batch_size;
519 self
520 }
521
522 #[must_use]
524 pub const fn apply_batch_size(mut self, apply_batch_size: i64) -> Self {
525 self.apply_batch_size = apply_batch_size;
526 self
527 }
528
529 pub fn build(self) -> Result<SyncEngine<A>, ForceSyncError> {
535 let store = self
536 .postgres
537 .ok_or(ForceSyncError::MissingConfiguration { field: "postgres" })?;
538 if self.objects.is_empty() {
539 return Err(ForceSyncError::MissingConfiguration {
540 field: "object sync",
541 });
542 }
543
544 Ok(SyncEngine {
545 store,
546 salesforce: SalesforceApplier::new(self.salesforce_client),
547 objects: self
548 .objects
549 .into_iter()
550 .map(|object| (object.object_name().to_owned(), object))
551 .collect(),
552 capture_batch_size: self.capture_batch_size,
553 capture_priority: self.capture_priority,
554 apply_batch_size: self.apply_batch_size,
555 reconcile_batch_size: self.reconcile_batch_size,
556 lease_for: self.lease_for,
557 worker_id: self.worker_id,
558 })
559 }
560}
561
562fn build_apply_task_context(row: &tokio_postgres::Row) -> Result<ApplyTaskContext, ForceSyncError> {
563 let journal_id: i64 = row.get("journal_id");
564 let tenant: String = row.get("tenant");
565 let object_name: String = row.get("object_name");
566 let external_id: String = row.get("external_id");
567 let source: String = row.get("source");
568 let source_cursor: String = row.get("source_cursor");
569 let observed_at: DateTime<Utc> = row.get("observed_at");
570 let operation: String = row.get("operation");
571 let payload_json: String = row.get("payload_json");
572 let current_payload_json: Option<String> = row.get("current_payload_json");
573
574 let sync_key = SyncKey::new(tenant, object_name, external_id)?;
575 let payload = serde_json::from_str(&payload_json)?;
576 let current_payload = match current_payload_json {
577 Some(current_payload_json) => Some(serde_json::from_str(¤t_payload_json)?),
578 None => None,
579 };
580 let envelope = ChangeEnvelope::new(
581 sync_key,
582 parse_source_system(&source)?,
583 parse_change_operation(&operation)?,
584 observed_at,
585 payload,
586 )
587 .with_cursor(parse_source_cursor(&source_cursor)?);
588
589 Ok(ApplyTaskContext {
590 _journal_id: journal_id,
591 envelope,
592 current_payload,
593 })
594}
595
596fn should_project_locally(source: SourceSystem, lane: ApplyLane) -> bool {
597 source == SourceSystem::Salesforce && !matches!(lane, ApplyLane::Noop | ApplyLane::Conflict)
598}
599
600fn local_projection_salesforce_id(
601 existing_link: Option<&crate::store::pg::SyncLink>,
602 envelope: &ChangeEnvelope,
603) -> Result<Option<force::types::SalesforceId>, ForceSyncError> {
604 if let Some(existing_salesforce_id) =
605 existing_link.and_then(|link| link.salesforce_id.as_deref())
606 {
607 return force::types::SalesforceId::new(existing_salesforce_id.to_owned())
608 .map(Some)
609 .map_err(|error| ForceSyncError::InvalidStoredValue {
610 field: "salesforce_id",
611 value: error.to_string(),
612 });
613 }
614
615 let Some(payload_salesforce_id) = envelope.payload().get("Id").and_then(Value::as_str) else {
616 return Ok(None);
617 };
618
619 force::types::SalesforceId::new(payload_salesforce_id.to_owned())
620 .map(Some)
621 .map_err(|error| ForceSyncError::InvalidStoredValue {
622 field: "payload.Id",
623 value: error.to_string(),
624 })
625}
626
627fn parse_source_system(value: &str) -> Result<SourceSystem, ForceSyncError> {
628 match value {
629 "salesforce" => Ok(SourceSystem::Salesforce),
630 "postgres" => Ok(SourceSystem::Postgres),
631 _ => Err(ForceSyncError::InvalidStoredValue {
632 field: "source",
633 value: value.to_owned(),
634 }),
635 }
636}
637
638fn parse_change_operation(value: &str) -> Result<ChangeOperation, ForceSyncError> {
639 match value {
640 "upsert" => Ok(ChangeOperation::Upsert),
641 "delete" => Ok(ChangeOperation::Delete),
642 _ => Err(ForceSyncError::InvalidStoredValue {
643 field: "operation",
644 value: value.to_owned(),
645 }),
646 }
647}
648
649fn parse_source_cursor(value: &str) -> Result<SourceCursor, ForceSyncError> {
650 if let Some(replay_id) = value.strip_prefix("salesforce-replay-id:") {
651 let replay_id =
652 replay_id
653 .parse::<i64>()
654 .map_err(|_| ForceSyncError::InvalidStoredValue {
655 field: "source_cursor",
656 value: value.to_owned(),
657 })?;
658 return Ok(SourceCursor::SalesforceReplayId(replay_id));
659 }
660
661 if let Some(lsn) = value.strip_prefix("postgres-lsn:") {
662 return Ok(SourceCursor::PostgresLsn(lsn.to_owned()));
663 }
664
665 if let Some(snapshot) = value.strip_prefix("snapshot:") {
666 return Ok(SourceCursor::Snapshot(snapshot.to_owned()));
667 }
668
669 Err(ForceSyncError::InvalidStoredValue {
670 field: "source_cursor",
671 value: value.to_owned(),
672 })
673}
674
675#[cfg(test)]
676mod tests {
677 use chrono::Utc;
678 use serde_json::json;
679
680 use super::{
681 local_projection_salesforce_id, parse_change_operation, parse_source_cursor,
682 parse_source_system, should_project_locally,
683 };
684 use crate::{
685 identity::SyncKey,
686 model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
687 plan::ApplyLane,
688 store::pg::SyncLink,
689 };
690
691 fn envelope(payload: serde_json::Value) -> ChangeEnvelope {
692 ChangeEnvelope::new(
693 SyncKey::new("tenant", "Account", "external-1")
694 .unwrap_or_else(|error| panic!("unexpected sync key construction error: {error}")),
695 SourceSystem::Salesforce,
696 ChangeOperation::Upsert,
697 Utc::now(),
698 payload,
699 )
700 }
701
702 #[test]
703 fn should_not_project_local_noops_back_to_salesforce() {
704 assert!(!should_project_locally(
705 SourceSystem::Salesforce,
706 ApplyLane::Noop
707 ));
708 assert!(!should_project_locally(
709 SourceSystem::Postgres,
710 ApplyLane::Rest
711 ));
712 assert!(should_project_locally(
713 SourceSystem::Salesforce,
714 ApplyLane::Rest
715 ));
716 }
717
718 #[test]
719 fn local_projection_salesforce_id_uses_payload_id_when_link_is_missing() {
720 let envelope = envelope(json!({
721 "Id": "001000000000009AAA",
722 "Name": "Incoming Salesforce"
723 }));
724
725 let salesforce_id = local_projection_salesforce_id(None, &envelope)
726 .unwrap_or_else(|error| panic!("unexpected projection error: {error}"));
727
728 assert_eq!(
729 salesforce_id
730 .as_ref()
731 .map(force::types::SalesforceId::as_str),
732 Some("001000000000009AAA")
733 );
734 }
735
736 #[test]
737 fn local_projection_salesforce_id_prefers_existing_link() {
738 let envelope = envelope(json!({
739 "Id": "001000000000009AAA",
740 "Name": "Incoming Salesforce"
741 }));
742 let existing_link = SyncLink {
743 tenant: "tenant".to_owned(),
744 object_name: "Account".to_owned(),
745 external_id: "external-1".to_owned(),
746 salesforce_id: Some("001000000000001AAA".to_owned()),
747 postgres_id: None,
748 last_source: Some("postgres".to_owned()),
749 last_source_cursor: Some("postgres-lsn:1".to_owned()),
750 last_payload_hash: None,
751 tombstone: false,
752 };
753
754 let salesforce_id = local_projection_salesforce_id(Some(&existing_link), &envelope)
755 .unwrap_or_else(|error| panic!("unexpected projection error: {error}"));
756
757 assert_eq!(
758 salesforce_id
759 .as_ref()
760 .map(force::types::SalesforceId::as_str),
761 Some("001000000000001AAA")
762 );
763 }
764
765 #[test]
766 fn local_projection_returns_none_when_no_link_and_no_payload_id() {
767 let envelope = envelope(json!({"Name": "No Id Field"}));
768 let salesforce_id = local_projection_salesforce_id(None, &envelope)
769 .unwrap_or_else(|error| panic!("unexpected projection error: {error}"));
770 assert!(salesforce_id.is_none());
771 }
772
773 #[test]
776 fn parse_source_system_salesforce() {
777 assert_eq!(
778 parse_source_system("salesforce").unwrap_or_else(|e| panic!("unexpected error: {e}")),
779 SourceSystem::Salesforce,
780 );
781 }
782
783 #[test]
784 fn parse_source_system_postgres() {
785 assert_eq!(
786 parse_source_system("postgres").unwrap_or_else(|e| panic!("unexpected error: {e}")),
787 SourceSystem::Postgres,
788 );
789 }
790
791 #[test]
792 fn parse_source_system_unknown_returns_error() {
793 let Err(err) = parse_source_system("oracle") else {
794 panic!("expected error for unknown source system");
795 };
796 assert!(err.to_string().contains("oracle"));
797 }
798
799 #[test]
802 fn parse_change_operation_upsert() {
803 assert_eq!(
804 parse_change_operation("upsert").unwrap_or_else(|e| panic!("unexpected error: {e}")),
805 ChangeOperation::Upsert,
806 );
807 }
808
809 #[test]
810 fn parse_change_operation_delete() {
811 assert_eq!(
812 parse_change_operation("delete").unwrap_or_else(|e| panic!("unexpected error: {e}")),
813 ChangeOperation::Delete,
814 );
815 }
816
817 #[test]
818 fn parse_change_operation_unknown_returns_error() {
819 let Err(err) = parse_change_operation("insert") else {
820 panic!("expected error for unknown change operation");
821 };
822 assert!(err.to_string().contains("insert"));
823 }
824
825 #[test]
828 fn parse_source_cursor_salesforce_replay_id() {
829 let cursor = parse_source_cursor("salesforce-replay-id:42")
830 .unwrap_or_else(|e| panic!("unexpected error: {e}"));
831 assert_eq!(cursor, SourceCursor::SalesforceReplayId(42));
832 }
833
834 #[test]
835 fn parse_source_cursor_postgres_lsn() {
836 let cursor = parse_source_cursor("postgres-lsn:0/16B3748")
837 .unwrap_or_else(|e| panic!("unexpected error: {e}"));
838 assert_eq!(cursor, SourceCursor::PostgresLsn("0/16B3748".to_owned()));
839 }
840
841 #[test]
842 fn parse_source_cursor_snapshot() {
843 let cursor = parse_source_cursor("snapshot:2024-01-01T00:00:00Z")
844 .unwrap_or_else(|e| panic!("unexpected error: {e}"));
845 assert_eq!(
846 cursor,
847 SourceCursor::Snapshot("2024-01-01T00:00:00Z".to_owned())
848 );
849 }
850
851 #[test]
852 fn parse_source_cursor_invalid_replay_id_returns_error() {
853 let Err(err) = parse_source_cursor("salesforce-replay-id:not-a-number") else {
854 panic!("expected error for invalid replay ID");
855 };
856 assert!(err.to_string().contains("source_cursor"));
857 }
858
859 #[test]
860 fn parse_source_cursor_unknown_prefix_returns_error() {
861 let Err(err) = parse_source_cursor("kafka-offset:99") else {
862 panic!("expected error for unknown cursor prefix");
863 };
864 assert!(err.to_string().contains("kafka-offset:99"));
865 }
866
867 #[test]
870 fn should_project_locally_salesforce_bulk() {
871 assert!(should_project_locally(
872 SourceSystem::Salesforce,
873 ApplyLane::Bulk
874 ));
875 }
876
877 #[test]
878 fn should_not_project_locally_salesforce_conflict() {
879 assert!(!should_project_locally(
880 SourceSystem::Salesforce,
881 ApplyLane::Conflict
882 ));
883 }
884
885 #[test]
886 fn should_not_project_locally_postgres_bulk() {
887 assert!(!should_project_locally(
888 SourceSystem::Postgres,
889 ApplyLane::Bulk
890 ));
891 }
892
893 mod builder_tests {
896 use async_trait::async_trait;
897 use force::{
898 auth::{AccessToken, Authenticator, TokenResponse},
899 client::builder,
900 error::Result as ForceResult,
901 };
902
903 use crate::{ObjectSync, SyncEngine, error::ForceSyncError};
904
905 #[derive(Debug, Clone)]
906 struct StubAuth;
907
908 #[async_trait]
909 impl Authenticator for StubAuth {
910 async fn authenticate(&self) -> ForceResult<AccessToken> {
911 Ok(AccessToken::from_response(TokenResponse {
912 access_token: "stub".to_owned(),
913 instance_url: "https://stub.salesforce.com".to_owned(),
914 token_type: "Bearer".to_owned(),
915 issued_at: "1704067200000".to_owned(),
916 signature: "stub".to_owned(),
917 expires_in: Some(7200),
918 refresh_token: None,
919 }))
920 }
921
922 async fn refresh(&self) -> ForceResult<AccessToken> {
923 self.authenticate().await
924 }
925 }
926
927 #[tokio::test]
928 async fn build_fails_without_postgres() {
929 let client = builder()
930 .authenticate(StubAuth)
931 .build()
932 .await
933 .unwrap_or_else(|e| panic!("unexpected client build error: {e}"));
934
935 let result = SyncEngine::builder(client)
936 .object(ObjectSync::new("Account").external_id("ExternalId__c"))
937 .build();
938
939 let Err(err) = result else {
940 panic!("expected error for missing postgres");
941 };
942 assert!(
943 matches!(
944 err,
945 ForceSyncError::MissingConfiguration { field: "postgres" }
946 ),
947 "expected MissingConfiguration for postgres, got: {err}"
948 );
949 }
950
951 #[tokio::test]
952 async fn build_fails_with_empty_objects() {
953 let client = builder()
954 .authenticate(StubAuth)
955 .build()
956 .await
957 .unwrap_or_else(|e| panic!("unexpected client build error: {e}"));
958
959 let mut config = deadpool_postgres::Config::new();
961 config.url = Some("postgresql://unused:unused@localhost:5432/unused".to_owned());
962 let pool = config
963 .create_pool(
964 Some(deadpool_postgres::Runtime::Tokio1),
965 tokio_postgres::NoTls,
966 )
967 .unwrap_or_else(|e| panic!("unexpected pool error: {e}"));
968 let store = crate::PgStore::new(pool);
969
970 let result = SyncEngine::builder(client).postgres(store).build();
971
972 let Err(err) = result else {
973 panic!("expected error for empty objects");
974 };
975 assert!(
976 matches!(
977 err,
978 ForceSyncError::MissingConfiguration {
979 field: "object sync"
980 }
981 ),
982 "expected MissingConfiguration for object sync, got: {err}"
983 );
984 }
985 }
986}