Skip to main content

force_sync/
plan.rs

1//! Pure planner and merge logic for sync envelopes.
2
3use serde_json::{Map, Value};
4
5use crate::config::{ObjectSync, Owner};
6use crate::model::{ChangeEnvelope, SourceSystem, payload_hash};
7
8/// Apply lanes chosen by the planner.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ApplyLane {
11    /// No write is required because the incoming change matches current state.
12    Noop,
13    /// The change requires operator intervention or later resolution.
14    Conflict,
15    /// Apply the change through the Salesforce REST API.
16    Rest,
17    /// Apply the change through Composite Graph for dependent records.
18    CompositeGraph,
19    /// Apply the change through bulk transport.
20    Bulk,
21}
22
23/// The pure inputs required to plan a change.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct PlannerContext {
26    /// Object-level sync configuration.
27    pub object: ObjectSync,
28    /// The current canonical payload, if one is already known.
29    pub current_payload: Option<Value>,
30    /// The number of records being planned together.
31    pub batch_size: usize,
32    /// Whether the record should favor the fast path over batch economics.
33    pub urgent: bool,
34    /// Whether the change depends on related records being applied together.
35    pub has_dependencies: bool,
36}
37
38/// The final planner decision after merge and lane selection.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct PlanDecision {
41    /// The selected apply lane.
42    pub lane: ApplyLane,
43    /// The merged payload to apply, if any.
44    pub payload: Option<Value>,
45    /// Fields that could not be resolved automatically.
46    pub conflicts: Vec<String>,
47}
48
49/// The result of merging payloads before lane selection.
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub enum MergeOutcome {
52    /// No material change was detected.
53    Noop,
54    /// The payload was merged successfully.
55    Merged(Value),
56    /// The merge could not be resolved automatically.
57    Conflict {
58        /// The fields that caused the conflict.
59        fields: Vec<String>,
60    },
61}
62
63/// Merges an incoming payload into the current payload using field ownership rules.
64#[must_use]
65pub fn merge_payload(
66    object: &ObjectSync,
67    current_payload: Option<&Value>,
68    source: SourceSystem,
69    incoming_payload: &Value,
70) -> MergeOutcome {
71    let Some(current_payload) = current_payload else {
72        return MergeOutcome::Merged(incoming_payload.clone());
73    };
74
75    if payload_hash(current_payload) == payload_hash(incoming_payload) {
76        return MergeOutcome::Noop;
77    }
78
79    match (current_payload, incoming_payload) {
80        (Value::Object(current), Value::Object(incoming)) => {
81            merge_object_payload(object, current, source, incoming)
82        }
83        _ if current_payload == incoming_payload => MergeOutcome::Noop,
84        _ => MergeOutcome::Conflict {
85            fields: vec!["<root>".to_string()],
86        },
87    }
88}
89
90/// Plans how a change should be applied after merge resolution.
91#[must_use]
92pub fn plan_change(context: &PlannerContext, envelope: &ChangeEnvelope) -> PlanDecision {
93    if context
94        .current_payload
95        .as_ref()
96        .is_some_and(|current| envelope.payload_hash_matches(current))
97    {
98        return PlanDecision {
99            lane: ApplyLane::Noop,
100            payload: None,
101            conflicts: Vec::new(),
102        };
103    }
104
105    match merge_payload(
106        &context.object,
107        context.current_payload.as_ref(),
108        envelope.source(),
109        envelope.payload(),
110    ) {
111        MergeOutcome::Noop => PlanDecision {
112            lane: ApplyLane::Noop,
113            payload: None,
114            conflicts: Vec::new(),
115        },
116        MergeOutcome::Conflict { fields } => PlanDecision {
117            lane: ApplyLane::Conflict,
118            payload: None,
119            conflicts: fields,
120        },
121        MergeOutcome::Merged(payload) => PlanDecision {
122            lane: if context
123                .current_payload
124                .as_ref()
125                .is_some_and(|current| current == &payload)
126            {
127                ApplyLane::Noop
128            } else {
129                choose_lane(context)
130            },
131            payload: if context
132                .current_payload
133                .as_ref()
134                .is_some_and(|current| current == &payload)
135            {
136                None
137            } else {
138                Some(payload)
139            },
140            conflicts: Vec::new(),
141        },
142    }
143}
144
145const fn choose_lane(context: &PlannerContext) -> ApplyLane {
146    if context.has_dependencies {
147        return ApplyLane::CompositeGraph;
148    }
149
150    if context.urgent {
151        return ApplyLane::Rest;
152    }
153
154    if context.batch_size >= context.object.lane_thresholds().bulk_min_batch_size() {
155        ApplyLane::Bulk
156    } else {
157        ApplyLane::Rest
158    }
159}
160
161fn merge_object_payload(
162    object: &ObjectSync,
163    current: &Map<String, Value>,
164    source: SourceSystem,
165    incoming: &Map<String, Value>,
166) -> MergeOutcome {
167    let mut merged = current.clone();
168    let mut conflicts = Vec::new();
169
170    for (field, incoming_value) in incoming {
171        match merged.get(field) {
172            Some(existing_value) if existing_value == incoming_value => {}
173            Some(_existing_value) => match object.field_owner_for(field) {
174                Some(Owner::Salesforce) if source == SourceSystem::Salesforce => {
175                    merged.insert(field.clone(), incoming_value.clone());
176                }
177                Some(Owner::Postgres) if source == SourceSystem::Postgres => {
178                    merged.insert(field.clone(), incoming_value.clone());
179                }
180                Some(Owner::Salesforce | Owner::Postgres) => {}
181                Some(Owner::Shared) | None => conflicts.push(field.clone()),
182            },
183            None => match object.field_owner_for(field) {
184                Some(Owner::Salesforce) if source == SourceSystem::Salesforce => {
185                    merged.insert(field.clone(), incoming_value.clone());
186                }
187                Some(Owner::Postgres) if source == SourceSystem::Postgres => {
188                    merged.insert(field.clone(), incoming_value.clone());
189                }
190                Some(Owner::Salesforce | Owner::Postgres) => {}
191                Some(Owner::Shared) | None => {
192                    merged.insert(field.clone(), incoming_value.clone());
193                }
194            },
195        }
196    }
197
198    if !conflicts.is_empty() {
199        return MergeOutcome::Conflict { fields: conflicts };
200    }
201
202    MergeOutcome::Merged(Value::Object(merged))
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::identity::SyncKey;
209    use crate::model::{ChangeEnvelope, ChangeOperation, SourceSystem};
210    use chrono::Utc;
211    use serde_json::json;
212
213    fn test_object_sync() -> ObjectSync {
214        ObjectSync::new("Account")
215    }
216
217    fn test_object_sync_with_ownership() -> ObjectSync {
218        ObjectSync::new("Account")
219            .field_owner("Name", Owner::Salesforce)
220            .field_owner("Billing", Owner::Postgres)
221            .field_owner("Phone", Owner::Shared)
222    }
223
224    fn test_envelope(
225        source: SourceSystem,
226        operation: ChangeOperation,
227        payload: Value,
228    ) -> ChangeEnvelope {
229        let Ok(sync_key) = SyncKey::new("tenant", "Account", "ext-001") else {
230            panic!("failed to create test sync key");
231        };
232        ChangeEnvelope::new(sync_key, source, operation, Utc::now(), payload)
233    }
234
235    fn test_context(object: &ObjectSync, current_payload: Option<Value>) -> PlannerContext {
236        PlannerContext {
237            object: object.clone(),
238            current_payload,
239            batch_size: 1,
240            urgent: false,
241            has_dependencies: false,
242        }
243    }
244
245    // ── merge_payload ─────────────────────────────────────────────────
246
247    #[test]
248    fn merge_payload_no_current_returns_incoming_as_merged() {
249        let object = test_object_sync();
250        let incoming = json!({"Name": "Acme"});
251
252        let result = merge_payload(&object, None, SourceSystem::Salesforce, &incoming);
253        assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Acme"})));
254    }
255
256    #[test]
257    fn merge_payload_identical_payloads_returns_noop() {
258        let object = test_object_sync();
259        let current = json!({"Name": "Acme"});
260        let incoming = json!({"Name": "Acme"});
261
262        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
263        assert_eq!(result, MergeOutcome::Noop);
264    }
265
266    #[test]
267    fn merge_payload_non_object_mismatch_returns_conflict() {
268        let object = test_object_sync();
269        let current = json!("string_a");
270        let incoming = json!("string_b");
271
272        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
273        assert!(matches!(result, MergeOutcome::Conflict { fields } if fields == vec!["<root>"]));
274    }
275
276    #[test]
277    fn merge_payload_non_object_identical_returns_noop() {
278        let object = test_object_sync();
279        let current = json!(42);
280        let incoming = json!(42);
281
282        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
283        assert_eq!(result, MergeOutcome::Noop);
284    }
285
286    #[test]
287    fn merge_payload_new_field_no_ownership_is_added() {
288        let object = test_object_sync();
289        let current = json!({"Name": "Acme"});
290        let incoming = json!({"Name": "Acme", "Phone": "555-1234"});
291
292        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
293        assert_eq!(
294            result,
295            MergeOutcome::Merged(json!({"Name": "Acme", "Phone": "555-1234"}))
296        );
297    }
298
299    #[test]
300    fn merge_payload_salesforce_owned_field_accepted_from_salesforce() {
301        let object = test_object_sync_with_ownership();
302        let current = json!({"Name": "Old"});
303        let incoming = json!({"Name": "New"});
304
305        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
306        assert_eq!(result, MergeOutcome::Merged(json!({"Name": "New"})));
307    }
308
309    #[test]
310    fn merge_payload_salesforce_owned_field_rejected_from_postgres() {
311        let object = test_object_sync_with_ownership();
312        let current = json!({"Name": "Acme"});
313        let incoming = json!({"Name": "Changed"});
314
315        let result = merge_payload(&object, Some(&current), SourceSystem::Postgres, &incoming);
316        assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Acme"})));
317    }
318
319    #[test]
320    fn merge_payload_shared_field_conflict_on_change() {
321        let object = test_object_sync_with_ownership();
322        let current = json!({"Phone": "111"});
323        let incoming = json!({"Phone": "222"});
324
325        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
326        assert!(matches!(result, MergeOutcome::Conflict { fields } if fields == vec!["Phone"]));
327    }
328
329    #[test]
330    fn merge_payload_null_field_in_incoming_with_ownership() {
331        let object = ObjectSync::new("Account")
332            .field_owner("Name", Owner::Salesforce)
333            .field_owner("Phone", Owner::Salesforce);
334        let current = json!({"Name": "Acme", "Phone": "555"});
335        let incoming = json!({"Name": "Acme", "Phone": null});
336
337        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
338        assert_eq!(
339            result,
340            MergeOutcome::Merged(json!({"Name": "Acme", "Phone": null}))
341        );
342    }
343
344    #[test]
345    fn merge_payload_empty_incoming_object_returns_current() {
346        let object = test_object_sync();
347        let current = json!({"Name": "Acme"});
348        let incoming = json!({});
349
350        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
351        assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Acme"})));
352    }
353
354    #[test]
355    fn merge_payload_postgres_owned_field_accepted_from_postgres() {
356        let object = test_object_sync_with_ownership();
357        let current = json!({"Billing": "Old Address"});
358        let incoming = json!({"Billing": "New Address"});
359
360        let result = merge_payload(&object, Some(&current), SourceSystem::Postgres, &incoming);
361        assert_eq!(
362            result,
363            MergeOutcome::Merged(json!({"Billing": "New Address"}))
364        );
365    }
366
367    #[test]
368    fn merge_payload_postgres_owned_field_rejected_from_salesforce() {
369        let object = test_object_sync_with_ownership();
370        let current = json!({"Billing": "Original"});
371        let incoming = json!({"Billing": "Overwritten"});
372
373        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
374        assert_eq!(result, MergeOutcome::Merged(json!({"Billing": "Original"})));
375    }
376
377    #[test]
378    fn merge_payload_new_field_salesforce_owned_added_from_salesforce() {
379        let object = test_object_sync_with_ownership();
380        let current = json!({});
381        let incoming = json!({"Name": "Added"});
382
383        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
384        assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Added"})));
385    }
386
387    #[test]
388    fn merge_payload_new_field_salesforce_owned_ignored_from_postgres() {
389        let object = test_object_sync_with_ownership();
390        let current = json!({});
391        let incoming = json!({"Name": "ShouldBeIgnored"});
392
393        let result = merge_payload(&object, Some(&current), SourceSystem::Postgres, &incoming);
394        assert_eq!(result, MergeOutcome::Merged(json!({})));
395    }
396
397    #[test]
398    fn merge_payload_new_field_postgres_owned_ignored_from_salesforce() {
399        let object = test_object_sync_with_ownership();
400        let current = json!({});
401        let incoming = json!({"Billing": "ShouldBeIgnored"});
402
403        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
404        assert_eq!(result, MergeOutcome::Merged(json!({})));
405    }
406
407    #[test]
408    fn merge_payload_new_shared_field_is_added() {
409        let object = test_object_sync_with_ownership();
410        let current = json!({});
411        let incoming = json!({"Phone": "555-0100"});
412
413        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
414        assert_eq!(result, MergeOutcome::Merged(json!({"Phone": "555-0100"})));
415    }
416
417    #[test]
418    fn merge_payload_multiple_conflicts_collected() {
419        let object = ObjectSync::new("Account")
420            .field_owner("A", Owner::Shared)
421            .field_owner("B", Owner::Shared);
422        let current = json!({"A": 1, "B": 2});
423        let incoming = json!({"A": 10, "B": 20});
424
425        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
426        match result {
427            MergeOutcome::Conflict { fields } => {
428                assert!(fields.contains(&"A".to_string()));
429                assert!(fields.contains(&"B".to_string()));
430                assert_eq!(fields.len(), 2);
431            }
432            other => panic!("expected Conflict, got {other:?}"),
433        }
434    }
435
436    #[test]
437    fn merge_payload_null_field_in_incoming_no_ownership() {
438        let object = test_object_sync();
439        let current = json!({"Name": "Acme", "Phone": "555"});
440        let incoming = json!({"Name": "Acme", "Phone": null});
441
442        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
443        assert!(matches!(result, MergeOutcome::Conflict { fields } if fields == vec!["Phone"]));
444    }
445
446    // ── plan_change ───────────────────────────────────────────────────
447
448    #[test]
449    fn plan_change_delete_no_current_routes_to_rest() {
450        let object = test_object_sync();
451        let envelope = test_envelope(SourceSystem::Salesforce, ChangeOperation::Delete, json!({}));
452        let ctx = test_context(&object, None);
453
454        let decision = plan_change(&ctx, &envelope);
455        assert_eq!(decision.lane, ApplyLane::Rest);
456    }
457
458    #[test]
459    fn plan_change_identical_payload_returns_noop() {
460        let object = test_object_sync();
461        let payload = json!({"Name": "Acme"});
462        let envelope = test_envelope(
463            SourceSystem::Salesforce,
464            ChangeOperation::Upsert,
465            payload.clone(),
466        );
467        let ctx = test_context(&object, Some(payload));
468
469        let decision = plan_change(&ctx, &envelope);
470        assert_eq!(decision.lane, ApplyLane::Noop);
471        assert!(decision.payload.is_none());
472        assert!(decision.conflicts.is_empty());
473    }
474
475    #[test]
476    fn plan_change_new_record_routes_to_rest() {
477        let object = test_object_sync();
478        let envelope = test_envelope(
479            SourceSystem::Postgres,
480            ChangeOperation::Upsert,
481            json!({"Name": "New"}),
482        );
483        let ctx = test_context(&object, None);
484
485        let decision = plan_change(&ctx, &envelope);
486        assert_eq!(decision.lane, ApplyLane::Rest);
487        assert_eq!(decision.payload, Some(json!({"Name": "New"})));
488    }
489
490    #[test]
491    fn plan_change_urgent_routes_to_rest() {
492        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
493        let envelope = test_envelope(
494            SourceSystem::Postgres,
495            ChangeOperation::Upsert,
496            json!({"Name": "Updated"}),
497        );
498        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
499        ctx.urgent = true;
500
501        let decision = plan_change(&ctx, &envelope);
502        assert_eq!(decision.lane, ApplyLane::Rest);
503    }
504
505    #[test]
506    fn plan_change_with_dependencies_routes_to_composite_graph() {
507        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
508        let envelope = test_envelope(
509            SourceSystem::Postgres,
510            ChangeOperation::Upsert,
511            json!({"Name": "Updated"}),
512        );
513        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
514        ctx.has_dependencies = true;
515
516        let decision = plan_change(&ctx, &envelope);
517        assert_eq!(decision.lane, ApplyLane::CompositeGraph);
518    }
519
520    #[test]
521    fn plan_change_large_batch_routes_to_bulk() {
522        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
523        let envelope = test_envelope(
524            SourceSystem::Postgres,
525            ChangeOperation::Upsert,
526            json!({"Name": "Updated"}),
527        );
528        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
529        ctx.batch_size = 1000;
530
531        let decision = plan_change(&ctx, &envelope);
532        assert_eq!(decision.lane, ApplyLane::Bulk);
533    }
534
535    #[test]
536    fn plan_change_conflict_returns_conflict_lane() {
537        let object = test_object_sync_with_ownership();
538        let envelope = test_envelope(
539            SourceSystem::Salesforce,
540            ChangeOperation::Upsert,
541            json!({"Phone": "new-value"}),
542        );
543        let ctx = test_context(&object, Some(json!({"Phone": "old-value"})));
544
545        let decision = plan_change(&ctx, &envelope);
546        assert_eq!(decision.lane, ApplyLane::Conflict);
547        assert!(decision.payload.is_none());
548        assert!(!decision.conflicts.is_empty());
549    }
550
551    #[test]
552    fn plan_change_merged_equals_current_returns_noop() {
553        let object = test_object_sync_with_ownership();
554        let current = json!({"Name": "Acme", "Billing": "123 Main"});
555        let incoming = json!({"Name": "Changed"});
556        let envelope = test_envelope(SourceSystem::Postgres, ChangeOperation::Upsert, incoming);
557        let ctx = test_context(&object, Some(current));
558
559        let decision = plan_change(&ctx, &envelope);
560        assert_eq!(decision.lane, ApplyLane::Noop);
561        assert!(decision.payload.is_none());
562    }
563
564    #[test]
565    fn plan_change_batch_below_bulk_threshold_routes_to_rest() {
566        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
567        let envelope = test_envelope(
568            SourceSystem::Postgres,
569            ChangeOperation::Upsert,
570            json!({"Name": "Updated"}),
571        );
572        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
573        ctx.batch_size = 499;
574
575        let decision = plan_change(&ctx, &envelope);
576        assert_eq!(decision.lane, ApplyLane::Rest);
577    }
578
579    #[test]
580    fn plan_change_batch_at_bulk_threshold_routes_to_bulk() {
581        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
582        let envelope = test_envelope(
583            SourceSystem::Postgres,
584            ChangeOperation::Upsert,
585            json!({"Name": "Updated"}),
586        );
587        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
588        ctx.batch_size = 500;
589
590        let decision = plan_change(&ctx, &envelope);
591        assert_eq!(decision.lane, ApplyLane::Bulk);
592    }
593
594    #[test]
595    fn plan_change_dependencies_take_priority_over_bulk() {
596        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
597        let envelope = test_envelope(
598            SourceSystem::Postgres,
599            ChangeOperation::Upsert,
600            json!({"Name": "Updated"}),
601        );
602        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
603        ctx.batch_size = 1000;
604        ctx.has_dependencies = true;
605
606        let decision = plan_change(&ctx, &envelope);
607        assert_eq!(decision.lane, ApplyLane::CompositeGraph);
608    }
609
610    #[test]
611    fn plan_change_dependencies_take_priority_over_urgent() {
612        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
613        let envelope = test_envelope(
614            SourceSystem::Postgres,
615            ChangeOperation::Upsert,
616            json!({"Name": "Updated"}),
617        );
618        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
619        ctx.urgent = true;
620        ctx.has_dependencies = true;
621
622        let decision = plan_change(&ctx, &envelope);
623        assert_eq!(decision.lane, ApplyLane::CompositeGraph);
624    }
625
626    #[test]
627    fn plan_change_urgent_takes_priority_over_bulk() {
628        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
629        let envelope = test_envelope(
630            SourceSystem::Postgres,
631            ChangeOperation::Upsert,
632            json!({"Name": "Updated"}),
633        );
634        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
635        ctx.batch_size = 1000;
636        ctx.urgent = true;
637
638        let decision = plan_change(&ctx, &envelope);
639        assert_eq!(decision.lane, ApplyLane::Rest);
640    }
641
642    #[test]
643    fn plan_change_semantically_equal_payload_returns_noop() {
644        let object = test_object_sync();
645        let current = json!({"A": 1, "B": 2});
646        let incoming = json!({"B": 2, "A": 1});
647        let envelope = test_envelope(SourceSystem::Postgres, ChangeOperation::Upsert, incoming);
648        let ctx = test_context(&object, Some(current));
649
650        let decision = plan_change(&ctx, &envelope);
651        assert_eq!(decision.lane, ApplyLane::Noop);
652    }
653}