1use std::{collections::HashMap, time::Duration};
4
5use chrono::{DateTime, Utc};
6use force::{auth::Authenticator, client::ForceClient};
7use serde_json::Value;
8
9use crate::{
10 apply::{
11 postgres::project_sync_link,
12 salesforce::{ApplyError, SalesforceApplier},
13 },
14 capture,
15 config::ObjectSync,
16 error::ForceSyncError,
17 identity::SyncKey,
18 model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
19 plan::{ApplyLane, PlannerContext, plan_change},
20 reconcile,
21 store::pg::{LeasedTask, PgStore, SyncConflict},
22};
23
24struct ApplyTaskContext {
25 _journal_id: i64,
26 envelope: ChangeEnvelope,
27 current_payload: Option<Value>,
28}
29
30#[derive(Debug, Clone)]
32pub struct SyncEngine<A: Authenticator> {
33 store: PgStore,
34 salesforce: SalesforceApplier<A>,
35 objects: HashMap<String, ObjectSync>,
36 capture_batch_size: i64,
37 capture_priority: i32,
38 apply_batch_size: i64,
39 reconcile_batch_size: i64,
40 lease_for: Duration,
41 worker_id: String,
42}
43
44#[derive(Debug)]
46pub struct SyncEngineBuilder<A: Authenticator> {
47 salesforce_client: ForceClient<A>,
48 postgres: Option<PgStore>,
49 objects: Vec<ObjectSync>,
50 capture_batch_size: i64,
51 capture_priority: i32,
52 apply_batch_size: i64,
53 reconcile_batch_size: i64,
54 lease_for: Duration,
55 worker_id: String,
56}
57
58impl<A: Authenticator> SyncEngine<A> {
59 #[must_use]
61 pub fn builder(salesforce_client: ForceClient<A>) -> SyncEngineBuilder<A> {
62 SyncEngineBuilder::new(salesforce_client)
63 }
64
65 pub async fn run_capture_postgres_once(&self) -> Result<usize, ForceSyncError> {
71 capture::postgres::capture_batch(
72 &self.store,
73 self.capture_batch_size,
74 self.capture_priority,
75 )
76 .await
77 }
78
79 pub async fn run_apply_once(&self) -> Result<usize, ForceSyncError> {
85 let leased = self
86 .store
87 .lease_ready_tasks(&self.worker_id, self.apply_batch_size, self.lease_for)
88 .await?;
89 let batch_size = leased.len().max(1);
90 let mut applied = 0usize;
91
92 for task in leased {
93 if self.process_leased_task(&task, batch_size).await? {
94 applied += 1;
95 }
96 }
97
98 Ok(applied)
99 }
100
101 pub async fn run_reconcile_once(&self) -> Result<usize, ForceSyncError> {
107 reconcile::run_reconcile_once(&self.store, self.reconcile_batch_size.max(1)).await
108 }
109
110 async fn process_leased_task(
111 &self,
112 task: &LeasedTask,
113 batch_size: usize,
114 ) -> Result<bool, ForceSyncError> {
115 let context = self.load_apply_task_context(task.task_id).await?;
116 let Some(context) = context else {
117 let _ = self
118 .store
119 .fail_task_for_worker(
120 &self.worker_id,
121 task.task_id,
122 "missing apply task journal row",
123 )
124 .await?;
125 return Ok(false);
126 };
127
128 let object = self
129 .objects
130 .get(context.envelope.sync_key().object_name())
131 .ok_or(ForceSyncError::MissingConfiguration {
132 field: "object sync",
133 })?;
134
135 let existing_link = self
136 .store
137 .get_link(
138 context.envelope.sync_key().tenant(),
139 context.envelope.sync_key().object_name(),
140 context.envelope.sync_key().external_id(),
141 )
142 .await?;
143
144 let decision = plan_change(
145 &PlannerContext {
146 object: object.clone(),
147 current_payload: context.current_payload.clone(),
148 batch_size,
149 urgent: batch_size <= object.lane_thresholds().rest_max_batch_size(),
150 has_dependencies: false,
151 },
152 &context.envelope,
153 );
154
155 self.apply_planned_task(
156 task,
157 &context.envelope,
158 existing_link.as_ref(),
159 object,
160 decision,
161 )
162 .await
163 }
164
165 async fn apply_planned_task(
166 &self,
167 task: &LeasedTask,
168 envelope: &ChangeEnvelope,
169 existing_link: Option<&crate::store::pg::SyncLink>,
170 object: &ObjectSync,
171 decision: crate::plan::PlanDecision,
172 ) -> Result<bool, ForceSyncError> {
173 if decision.lane == ApplyLane::Conflict {
174 for field_name in &decision.conflicts {
175 let conflict = SyncConflict {
176 tenant: envelope.sync_key().tenant().to_owned(),
177 object_name: envelope.sync_key().object_name().to_owned(),
178 external_id: envelope.sync_key().external_id().to_owned(),
179 field_name: field_name.clone(),
180 left_value: Value::Null,
181 right_value: Value::Null,
182 resolution: None,
183 };
184 self.store.insert_conflict(&conflict).await?;
185 }
186 let _ = self
187 .store
188 .fail_task_for_worker(
189 &self.worker_id,
190 task.task_id,
191 format!("planner conflict: {}", decision.conflicts.join(",")),
192 )
193 .await?;
194 return Ok(false);
195 }
196
197 if decision.lane == ApplyLane::Noop {
198 self.store
199 .ack_task_for_worker(&self.worker_id, task.task_id)
200 .await?;
201 return Ok(false);
202 }
203
204 if should_project_locally(envelope.source(), decision.lane) {
205 return self.apply_local_task(task, envelope, existing_link).await;
206 }
207
208 match decision.lane {
209 ApplyLane::Rest => {
210 let payload = decision
211 .payload
212 .as_ref()
213 .unwrap_or_else(|| envelope.payload());
214 let success = self
215 .apply_rest_task(task, envelope, payload, existing_link, object)
216 .await?;
217 Ok(success)
218 }
219 ApplyLane::Bulk => {
220 let payload = decision
221 .payload
222 .as_ref()
223 .unwrap_or_else(|| envelope.payload());
224 let success = self
225 .apply_bulk_task(task, envelope, payload, existing_link, object)
226 .await?;
227 Ok(success)
228 }
229 ApplyLane::CompositeGraph => {
230 let _ = self
231 .store
232 .fail_task_for_worker(
233 &self.worker_id,
234 task.task_id,
235 format!("unsupported runtime lane: {:?}", decision.lane),
236 )
237 .await?;
238 Ok(false)
239 }
240 ApplyLane::Conflict | ApplyLane::Noop => unreachable!("handled above"),
241 }
242 }
243
244 async fn apply_local_task(
245 &self,
246 task: &LeasedTask,
247 envelope: &ChangeEnvelope,
248 existing_link: Option<&crate::store::pg::SyncLink>,
249 ) -> Result<bool, ForceSyncError> {
250 let salesforce_id = local_projection_salesforce_id(existing_link, envelope)?;
251 let link = project_sync_link(
252 existing_link,
253 envelope,
254 salesforce_id.as_ref(),
255 matches!(envelope.operation(), ChangeOperation::Delete),
256 );
257 self.store.put_link(&link).await?;
258 self.store
259 .ack_task_for_worker(&self.worker_id, task.task_id)
260 .await?;
261 Ok(true)
262 }
263
264 async fn apply_bulk_task(
265 &self,
266 task: &LeasedTask,
267 envelope: &ChangeEnvelope,
268 payload: &Value,
269 existing_link: Option<&crate::store::pg::SyncLink>,
270 object: &ObjectSync,
271 ) -> Result<bool, ForceSyncError> {
272 match envelope.operation() {
273 ChangeOperation::Upsert => {
274 let Some(existing_salesforce_id) =
275 existing_link.and_then(|link| link.salesforce_id.as_deref())
276 else {
277 return self
278 .apply_rest_task(task, envelope, payload, existing_link, object)
279 .await;
280 };
281
282 let job_result = self
283 .salesforce
284 .apply_bulk_upsert(
285 envelope.sync_key().object_name(),
286 object
287 .external_id_field()
288 .ok_or(ForceSyncError::MissingConfiguration {
289 field: "external_id_field",
290 })?,
291 1,
292 vec![payload.clone()],
293 )
294 .await;
295 if let Err(error) = job_result {
296 return self.handle_apply_error(task, error).await;
297 }
298
299 let Some(salesforce_id) =
300 force::types::SalesforceId::new(existing_salesforce_id.to_owned()).ok()
301 else {
302 return self
303 .apply_rest_task(task, envelope, payload, existing_link, object)
304 .await;
305 };
306
307 let link = project_sync_link(existing_link, envelope, Some(&salesforce_id), false);
308 self.store.put_link(&link).await?;
309 self.store
310 .ack_task_for_worker(&self.worker_id, task.task_id)
311 .await?;
312 Ok(true)
313 }
314 ChangeOperation::Delete => {
315 self.apply_rest_task(task, envelope, payload, existing_link, object)
316 .await
317 }
318 }
319 }
320
321 async fn apply_rest_task(
322 &self,
323 task: &LeasedTask,
324 envelope: &ChangeEnvelope,
325 payload: &Value,
326 existing_link: Option<&crate::store::pg::SyncLink>,
327 object: &ObjectSync,
328 ) -> Result<bool, ForceSyncError> {
329 match envelope.operation() {
330 ChangeOperation::Upsert => {
331 let result = self
332 .salesforce
333 .apply_rest_upsert(
334 envelope.sync_key().object_name(),
335 object
336 .external_id_field()
337 .ok_or(ForceSyncError::MissingConfiguration {
338 field: "external_id_field",
339 })?,
340 envelope.sync_key().external_id(),
341 payload,
342 )
343 .await;
344
345 match result {
346 Ok(result) => {
347 let link = project_sync_link(
348 existing_link,
349 envelope,
350 result.salesforce_id.as_ref(),
351 false,
352 );
353 self.store.put_link(&link).await?;
354 self.store
355 .ack_task_for_worker(&self.worker_id, task.task_id)
356 .await?;
357 Ok(true)
358 }
359 Err(error) => self.handle_apply_error(task, error).await,
360 }
361 }
362 ChangeOperation::Delete => {
363 let Some(existing_link) = existing_link else {
364 let _ = self
365 .store
366 .fail_task_for_worker(
367 &self.worker_id,
368 task.task_id,
369 "missing Salesforce ID for delete",
370 )
371 .await?;
372 return Ok(false);
373 };
374 let Some(salesforce_id) = existing_link.salesforce_id.as_deref() else {
375 let _ = self
376 .store
377 .fail_task_for_worker(
378 &self.worker_id,
379 task.task_id,
380 "missing Salesforce ID for delete",
381 )
382 .await?;
383 return Ok(false);
384 };
385 let salesforce_id = force::types::SalesforceId::new(salesforce_id.to_owned())
386 .map_err(|error| ForceSyncError::InvalidStoredValue {
387 field: "salesforce_id",
388 value: error.to_string(),
389 })?;
390
391 match self
392 .salesforce
393 .apply_rest_delete(envelope.sync_key().object_name(), &salesforce_id)
394 .await
395 {
396 Ok(()) => {
397 let link = project_sync_link(
398 existing_link.into(),
399 envelope,
400 Some(&salesforce_id),
401 true,
402 );
403 self.store.put_link(&link).await?;
404 self.store
405 .ack_task_for_worker(&self.worker_id, task.task_id)
406 .await?;
407 Ok(true)
408 }
409 Err(error) => self.handle_apply_error(task, error).await,
410 }
411 }
412 }
413 }
414
415 async fn handle_apply_error(
416 &self,
417 task: &LeasedTask,
418 error: ApplyError,
419 ) -> Result<bool, ForceSyncError> {
420 let error_message = error.to_string();
421 match error {
422 ApplyError::Retryable(_) => {
423 self.store
424 .retry_task_for_worker(
425 &self.worker_id,
426 task.task_id,
427 Utc::now() + chrono::Duration::seconds(30),
428 error_message,
429 )
430 .await?;
431 }
432 ApplyError::Permanent(_) => {
433 self.store
434 .fail_task_for_worker(&self.worker_id, task.task_id, error_message)
435 .await?;
436 }
437 }
438
439 Ok(false)
440 }
441
442 async fn load_apply_task_context(
443 &self,
444 task_id: i64,
445 ) -> Result<Option<ApplyTaskContext>, ForceSyncError> {
446 let client = self.store.pool().get().await?;
447 let row = client
448 .query_opt(
449 "select
450 j.journal_id,
451 j.tenant,
452 j.object_name,
453 j.external_id,
454 j.source,
455 j.source_cursor,
456 j.observed_at,
457 j.operation,
458 j.payload::text as payload_json,
459 applied.payload::text as current_payload_json
460 from sync_task t
461 join sync_journal j
462 on (t.payload->>'journal_id')::bigint = j.journal_id
463 left join sync_link link
464 on link.tenant = j.tenant
465 and link.object_name = j.object_name
466 and link.external_id = j.external_id
467 left join lateral (
468 select payload
469 from sync_journal applied
470 where applied.tenant = j.tenant
471 and applied.object_name = j.object_name
472 and applied.external_id = j.external_id
473 and link.last_payload_hash is not null
474 and applied.payload_hash = link.last_payload_hash
475 order by applied.journal_id desc
476 limit 1
477 ) applied on true
478 where t.task_id = $1",
479 &[&task_id],
480 )
481 .await?;
482
483 row.as_ref().map(build_apply_task_context).transpose()
484 }
485}
486
487impl<A: Authenticator> SyncEngineBuilder<A> {
488 fn new(salesforce_client: ForceClient<A>) -> Self {
489 Self {
490 salesforce_client,
491 postgres: None,
492 objects: Vec::new(),
493 capture_batch_size: 100,
494 capture_priority: 0,
495 apply_batch_size: 1,
496 reconcile_batch_size: 100,
497 lease_for: Duration::from_secs(30),
498 worker_id: "force-sync-apply".to_owned(),
499 }
500 }
501
502 #[must_use]
504 pub fn postgres(mut self, postgres: PgStore) -> Self {
505 self.postgres = Some(postgres);
506 self
507 }
508
509 #[must_use]
511 pub fn object(mut self, object: ObjectSync) -> Self {
512 self.objects.push(object);
513 self
514 }
515
516 #[must_use]
518 pub const fn reconcile_batch_size(mut self, reconcile_batch_size: i64) -> Self {
519 self.reconcile_batch_size = reconcile_batch_size;
520 self
521 }
522
523 #[must_use]
525 pub const fn apply_batch_size(mut self, apply_batch_size: i64) -> Self {
526 self.apply_batch_size = apply_batch_size;
527 self
528 }
529
530 pub fn build(self) -> Result<SyncEngine<A>, ForceSyncError> {
536 let store = self
537 .postgres
538 .ok_or(ForceSyncError::MissingConfiguration { field: "postgres" })?;
539 if self.objects.is_empty() {
540 return Err(ForceSyncError::MissingConfiguration {
541 field: "object sync",
542 });
543 }
544
545 Ok(SyncEngine {
546 store,
547 salesforce: SalesforceApplier::new(self.salesforce_client),
548 objects: self
549 .objects
550 .into_iter()
551 .map(|object| (object.object_name().to_owned(), object))
552 .collect(),
553 capture_batch_size: self.capture_batch_size,
554 capture_priority: self.capture_priority,
555 apply_batch_size: self.apply_batch_size,
556 reconcile_batch_size: self.reconcile_batch_size,
557 lease_for: self.lease_for,
558 worker_id: self.worker_id,
559 })
560 }
561}
562
563fn build_apply_task_context(row: &tokio_postgres::Row) -> Result<ApplyTaskContext, ForceSyncError> {
564 let journal_id: i64 = row.get("journal_id");
565 let tenant: String = row.get("tenant");
566 let object_name: String = row.get("object_name");
567 let external_id: String = row.get("external_id");
568 let source: String = row.get("source");
569 let source_cursor: String = row.get("source_cursor");
570 let observed_at: DateTime<Utc> = row.get("observed_at");
571 let operation: String = row.get("operation");
572 let payload_json: String = row.get("payload_json");
573 let current_payload_json: Option<String> = row.get("current_payload_json");
574
575 let sync_key = SyncKey::new(tenant, object_name, external_id)?;
576 let payload = serde_json::from_str(&payload_json)?;
577 let current_payload = match current_payload_json {
578 Some(current_payload_json) => Some(serde_json::from_str(¤t_payload_json)?),
579 None => None,
580 };
581 let envelope = ChangeEnvelope::new(
582 sync_key,
583 parse_source_system(&source)?,
584 parse_change_operation(&operation)?,
585 observed_at,
586 payload,
587 )
588 .with_cursor(parse_source_cursor(&source_cursor)?);
589
590 Ok(ApplyTaskContext {
591 _journal_id: journal_id,
592 envelope,
593 current_payload,
594 })
595}
596
597fn should_project_locally(source: SourceSystem, lane: ApplyLane) -> bool {
598 source == SourceSystem::Salesforce && !matches!(lane, ApplyLane::Noop | ApplyLane::Conflict)
599}
600
601fn local_projection_salesforce_id(
602 existing_link: Option<&crate::store::pg::SyncLink>,
603 envelope: &ChangeEnvelope,
604) -> Result<Option<force::types::SalesforceId>, ForceSyncError> {
605 if let Some(existing_salesforce_id) =
606 existing_link.and_then(|link| link.salesforce_id.as_deref())
607 {
608 return force::types::SalesforceId::new(existing_salesforce_id.to_owned())
609 .map(Some)
610 .map_err(|error| ForceSyncError::InvalidStoredValue {
611 field: "salesforce_id",
612 value: error.to_string(),
613 });
614 }
615
616 let Some(payload_salesforce_id) = envelope.payload().get("Id").and_then(Value::as_str) else {
617 return Ok(None);
618 };
619
620 force::types::SalesforceId::new(payload_salesforce_id.to_owned())
621 .map(Some)
622 .map_err(|error| ForceSyncError::InvalidStoredValue {
623 field: "payload.Id",
624 value: error.to_string(),
625 })
626}
627
628fn parse_source_system(value: &str) -> Result<SourceSystem, ForceSyncError> {
629 match value {
630 "salesforce" => Ok(SourceSystem::Salesforce),
631 "postgres" => Ok(SourceSystem::Postgres),
632 _ => Err(ForceSyncError::InvalidStoredValue {
633 field: "source",
634 value: value.to_owned(),
635 }),
636 }
637}
638
639fn parse_change_operation(value: &str) -> Result<ChangeOperation, ForceSyncError> {
640 match value {
641 "upsert" => Ok(ChangeOperation::Upsert),
642 "delete" => Ok(ChangeOperation::Delete),
643 _ => Err(ForceSyncError::InvalidStoredValue {
644 field: "operation",
645 value: value.to_owned(),
646 }),
647 }
648}
649
650fn parse_source_cursor(value: &str) -> Result<SourceCursor, ForceSyncError> {
651 if let Some(replay_id) = value.strip_prefix("salesforce-replay-id:") {
652 let replay_id =
653 replay_id
654 .parse::<i64>()
655 .map_err(|_| ForceSyncError::InvalidStoredValue {
656 field: "source_cursor",
657 value: value.to_owned(),
658 })?;
659 return Ok(SourceCursor::SalesforceReplayId(replay_id));
660 }
661
662 if let Some(lsn) = value.strip_prefix("postgres-lsn:") {
663 return Ok(SourceCursor::PostgresLsn(lsn.to_owned()));
664 }
665
666 if let Some(snapshot) = value.strip_prefix("snapshot:") {
667 return Ok(SourceCursor::Snapshot(snapshot.to_owned()));
668 }
669
670 Err(ForceSyncError::InvalidStoredValue {
671 field: "source_cursor",
672 value: value.to_owned(),
673 })
674}
675
676#[cfg(test)]
677mod tests {
678 use chrono::Utc;
679 use serde_json::json;
680
681 use super::{
682 local_projection_salesforce_id, parse_change_operation, parse_source_cursor,
683 parse_source_system, should_project_locally,
684 };
685 use crate::{
686 identity::SyncKey,
687 model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
688 plan::ApplyLane,
689 store::pg::SyncLink,
690 };
691
692 fn envelope(payload: serde_json::Value) -> ChangeEnvelope {
693 ChangeEnvelope::new(
694 SyncKey::new("tenant", "Account", "external-1")
695 .unwrap_or_else(|error| panic!("unexpected sync key construction error: {error}")),
696 SourceSystem::Salesforce,
697 ChangeOperation::Upsert,
698 Utc::now(),
699 payload,
700 )
701 }
702
703 #[test]
704 fn should_not_project_local_noops_back_to_salesforce() {
705 assert!(!should_project_locally(
706 SourceSystem::Salesforce,
707 ApplyLane::Noop
708 ));
709 assert!(!should_project_locally(
710 SourceSystem::Postgres,
711 ApplyLane::Rest
712 ));
713 assert!(should_project_locally(
714 SourceSystem::Salesforce,
715 ApplyLane::Rest
716 ));
717 }
718
719 #[test]
720 fn local_projection_salesforce_id_uses_payload_id_when_link_is_missing() {
721 let envelope = envelope(json!({
722 "Id": "001000000000009AAA",
723 "Name": "Incoming Salesforce"
724 }));
725
726 let salesforce_id = local_projection_salesforce_id(None, &envelope)
727 .unwrap_or_else(|error| panic!("unexpected projection error: {error}"));
728
729 assert_eq!(
730 salesforce_id
731 .as_ref()
732 .map(force::types::SalesforceId::as_str),
733 Some("001000000000009AAA")
734 );
735 }
736
737 #[test]
738 fn local_projection_salesforce_id_prefers_existing_link() {
739 let envelope = envelope(json!({
740 "Id": "001000000000009AAA",
741 "Name": "Incoming Salesforce"
742 }));
743 let existing_link = SyncLink {
744 tenant: "tenant".to_owned(),
745 object_name: "Account".to_owned(),
746 external_id: "external-1".to_owned(),
747 salesforce_id: Some("001000000000001AAA".to_owned()),
748 postgres_id: None,
749 last_source: Some("postgres".to_owned()),
750 last_source_cursor: Some("postgres-lsn:1".to_owned()),
751 last_payload_hash: None,
752 tombstone: false,
753 };
754
755 let salesforce_id = local_projection_salesforce_id(Some(&existing_link), &envelope)
756 .unwrap_or_else(|error| panic!("unexpected projection error: {error}"));
757
758 assert_eq!(
759 salesforce_id
760 .as_ref()
761 .map(force::types::SalesforceId::as_str),
762 Some("001000000000001AAA")
763 );
764 }
765
766 #[test]
767 fn local_projection_returns_none_when_no_link_and_no_payload_id() {
768 let envelope = envelope(json!({"Name": "No Id Field"}));
769 let salesforce_id = local_projection_salesforce_id(None, &envelope)
770 .unwrap_or_else(|error| panic!("unexpected projection error: {error}"));
771 assert!(salesforce_id.is_none());
772 }
773
774 #[test]
777 fn parse_source_system_salesforce() {
778 assert_eq!(
779 parse_source_system("salesforce").unwrap_or_else(|e| panic!("unexpected error: {e}")),
780 SourceSystem::Salesforce,
781 );
782 }
783
784 #[test]
785 fn parse_source_system_postgres() {
786 assert_eq!(
787 parse_source_system("postgres").unwrap_or_else(|e| panic!("unexpected error: {e}")),
788 SourceSystem::Postgres,
789 );
790 }
791
792 #[test]
793 fn parse_source_system_unknown_returns_error() {
794 let Err(err) = parse_source_system("oracle") else {
795 panic!("expected error for unknown source system");
796 };
797 assert!(err.to_string().contains("oracle"));
798 }
799
800 #[test]
803 fn parse_change_operation_upsert() {
804 assert_eq!(
805 parse_change_operation("upsert").unwrap_or_else(|e| panic!("unexpected error: {e}")),
806 ChangeOperation::Upsert,
807 );
808 }
809
810 #[test]
811 fn parse_change_operation_delete() {
812 assert_eq!(
813 parse_change_operation("delete").unwrap_or_else(|e| panic!("unexpected error: {e}")),
814 ChangeOperation::Delete,
815 );
816 }
817
818 #[test]
819 fn parse_change_operation_unknown_returns_error() {
820 let Err(err) = parse_change_operation("insert") else {
821 panic!("expected error for unknown change operation");
822 };
823 assert!(err.to_string().contains("insert"));
824 }
825
826 #[test]
829 fn parse_source_cursor_salesforce_replay_id() {
830 let cursor = parse_source_cursor("salesforce-replay-id:42")
831 .unwrap_or_else(|e| panic!("unexpected error: {e}"));
832 assert_eq!(cursor, SourceCursor::SalesforceReplayId(42));
833 }
834
835 #[test]
836 fn parse_source_cursor_postgres_lsn() {
837 let cursor = parse_source_cursor("postgres-lsn:0/16B3748")
838 .unwrap_or_else(|e| panic!("unexpected error: {e}"));
839 assert_eq!(cursor, SourceCursor::PostgresLsn("0/16B3748".to_owned()));
840 }
841
842 #[test]
843 fn parse_source_cursor_snapshot() {
844 let cursor = parse_source_cursor("snapshot:2024-01-01T00:00:00Z")
845 .unwrap_or_else(|e| panic!("unexpected error: {e}"));
846 assert_eq!(
847 cursor,
848 SourceCursor::Snapshot("2024-01-01T00:00:00Z".to_owned())
849 );
850 }
851
852 #[test]
853 fn parse_source_cursor_invalid_replay_id_returns_error() {
854 let Err(err) = parse_source_cursor("salesforce-replay-id:not-a-number") else {
855 panic!("expected error for invalid replay ID");
856 };
857 assert!(err.to_string().contains("source_cursor"));
858 }
859
860 #[test]
861 fn parse_source_cursor_unknown_prefix_returns_error() {
862 let Err(err) = parse_source_cursor("kafka-offset:99") else {
863 panic!("expected error for unknown cursor prefix");
864 };
865 assert!(err.to_string().contains("kafka-offset:99"));
866 }
867
868 #[test]
871 fn should_project_locally_salesforce_bulk() {
872 assert!(should_project_locally(
873 SourceSystem::Salesforce,
874 ApplyLane::Bulk
875 ));
876 }
877
878 #[test]
879 fn should_not_project_locally_salesforce_conflict() {
880 assert!(!should_project_locally(
881 SourceSystem::Salesforce,
882 ApplyLane::Conflict
883 ));
884 }
885
886 #[test]
887 fn should_not_project_locally_postgres_bulk() {
888 assert!(!should_project_locally(
889 SourceSystem::Postgres,
890 ApplyLane::Bulk
891 ));
892 }
893
894 mod builder_tests {
897 use async_trait::async_trait;
898 use force::{
899 auth::{AccessToken, Authenticator, TokenResponse},
900 client::builder,
901 error::Result as ForceResult,
902 };
903
904 use crate::{config::ObjectSync, error::ForceSyncError, runtime::SyncEngine};
905
906 #[derive(Debug, Clone)]
907 struct StubAuth;
908
909 #[async_trait]
910 impl Authenticator for StubAuth {
911 async fn authenticate(&self) -> ForceResult<AccessToken> {
912 Ok(AccessToken::from_response(TokenResponse {
913 access_token: "stub".to_owned(),
914 instance_url: "https://stub.salesforce.com".to_owned(),
915 token_type: "Bearer".to_owned(),
916 issued_at: "1704067200000".to_owned(),
917 signature: "stub".to_owned(),
918 expires_in: Some(7200),
919 refresh_token: None,
920 }))
921 }
922
923 async fn refresh(&self) -> ForceResult<AccessToken> {
924 self.authenticate().await
925 }
926 }
927
928 #[tokio::test]
929 async fn build_fails_without_postgres() {
930 let client = builder()
931 .authenticate(StubAuth)
932 .build()
933 .await
934 .unwrap_or_else(|e| panic!("unexpected client build error: {e}"));
935
936 let result = SyncEngine::builder(client)
937 .object(ObjectSync::new("Account").external_id("ExternalId__c"))
938 .build();
939
940 let Err(err) = result else {
941 panic!("expected error for missing postgres");
942 };
943 assert!(
944 matches!(
945 err,
946 ForceSyncError::MissingConfiguration { field: "postgres" }
947 ),
948 "expected MissingConfiguration for postgres, got: {err}"
949 );
950 }
951
952 #[tokio::test]
953 async fn build_fails_with_empty_objects() {
954 let client = builder()
955 .authenticate(StubAuth)
956 .build()
957 .await
958 .unwrap_or_else(|e| panic!("unexpected client build error: {e}"));
959
960 let mut config = deadpool_postgres::Config::new();
962 config.url = Some("postgresql://unused:unused@localhost:5432/unused".to_owned());
963 let pool = config
964 .create_pool(
965 Some(deadpool_postgres::Runtime::Tokio1),
966 tokio_postgres::NoTls,
967 )
968 .unwrap_or_else(|e| panic!("unexpected pool error: {e}"));
969 let store = crate::store::pg::PgStore::new(pool);
970
971 let result = SyncEngine::builder(client).postgres(store).build();
972
973 let Err(err) = result else {
974 panic!("expected error for empty objects");
975 };
976 assert!(
977 matches!(
978 err,
979 ForceSyncError::MissingConfiguration {
980 field: "object sync"
981 }
982 ),
983 "expected MissingConfiguration for object sync, got: {err}"
984 );
985 }
986 }
987}