1use serde_json::{Map, Value};
4
5use crate::config::{ObjectSync, Owner};
6use crate::model::{ChangeEnvelope, SourceSystem, payload_hash};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ApplyLane {
11 Noop,
13 Conflict,
15 Rest,
17 CompositeGraph,
19 Bulk,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct PlannerContext {
26 pub object: ObjectSync,
28 pub current_payload: Option<Value>,
30 pub batch_size: usize,
32 pub urgent: bool,
34 pub has_dependencies: bool,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct PlanDecision {
41 pub lane: ApplyLane,
43 pub payload: Option<Value>,
45 pub conflicts: Vec<String>,
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
51pub enum MergeOutcome {
52 Noop,
54 Merged(Value),
56 Conflict {
58 fields: Vec<String>,
60 },
61}
62
63#[must_use]
65pub fn merge_payload(
66 object: &ObjectSync,
67 current_payload: Option<&Value>,
68 source: SourceSystem,
69 incoming_payload: &Value,
70) -> MergeOutcome {
71 let Some(current_payload) = current_payload else {
72 return MergeOutcome::Merged(incoming_payload.clone());
73 };
74
75 if payload_hash(current_payload) == payload_hash(incoming_payload) {
76 return MergeOutcome::Noop;
77 }
78
79 match (current_payload, incoming_payload) {
80 (Value::Object(current), Value::Object(incoming)) => {
81 merge_object_payload(object, current, source, incoming)
82 }
83 _ if current_payload == incoming_payload => MergeOutcome::Noop,
84 _ => MergeOutcome::Conflict {
85 fields: vec!["<root>".to_string()],
86 },
87 }
88}
89
90#[must_use]
92pub fn plan_change(context: &PlannerContext, envelope: &ChangeEnvelope) -> PlanDecision {
93 if context
94 .current_payload
95 .as_ref()
96 .is_some_and(|current| envelope.payload_hash_matches(current))
97 {
98 return PlanDecision {
99 lane: ApplyLane::Noop,
100 payload: None,
101 conflicts: Vec::new(),
102 };
103 }
104
105 match merge_payload(
106 &context.object,
107 context.current_payload.as_ref(),
108 envelope.source(),
109 envelope.payload(),
110 ) {
111 MergeOutcome::Noop => PlanDecision {
112 lane: ApplyLane::Noop,
113 payload: None,
114 conflicts: Vec::new(),
115 },
116 MergeOutcome::Conflict { fields } => PlanDecision {
117 lane: ApplyLane::Conflict,
118 payload: None,
119 conflicts: fields,
120 },
121 MergeOutcome::Merged(payload) => PlanDecision {
122 lane: if context
123 .current_payload
124 .as_ref()
125 .is_some_and(|current| current == &payload)
126 {
127 ApplyLane::Noop
128 } else {
129 choose_lane(context)
130 },
131 payload: if context
132 .current_payload
133 .as_ref()
134 .is_some_and(|current| current == &payload)
135 {
136 None
137 } else {
138 Some(payload)
139 },
140 conflicts: Vec::new(),
141 },
142 }
143}
144
145const fn choose_lane(context: &PlannerContext) -> ApplyLane {
146 if context.has_dependencies {
147 return ApplyLane::CompositeGraph;
148 }
149
150 if context.urgent {
151 return ApplyLane::Rest;
152 }
153
154 if context.batch_size >= context.object.lane_thresholds().bulk_min_batch_size() {
155 ApplyLane::Bulk
156 } else {
157 ApplyLane::Rest
158 }
159}
160
161fn merge_object_payload(
162 object: &ObjectSync,
163 current: &Map<String, Value>,
164 source: SourceSystem,
165 incoming: &Map<String, Value>,
166) -> MergeOutcome {
167 let mut merged: Option<Map<String, Value>> = None;
170 let mut conflicts = Vec::new();
171
172 for (field, incoming_value) in incoming {
173 match current.get(field) {
174 Some(existing_value) if existing_value == incoming_value => {}
175 Some(_) => match object.field_owner_for(field) {
176 Some(Owner::Salesforce) if source == SourceSystem::Salesforce => {
177 merged
178 .get_or_insert_with(|| current.clone())
179 .insert(field.clone(), incoming_value.clone());
180 }
181 Some(Owner::Postgres) if source == SourceSystem::Postgres => {
182 merged
183 .get_or_insert_with(|| current.clone())
184 .insert(field.clone(), incoming_value.clone());
185 }
186 Some(Owner::Salesforce | Owner::Postgres) => {}
187 Some(Owner::Shared) | None => conflicts.push(field.clone()),
188 },
189 None => match object.field_owner_for(field) {
190 Some(Owner::Salesforce) if source == SourceSystem::Salesforce => {
191 merged
192 .get_or_insert_with(|| current.clone())
193 .insert(field.clone(), incoming_value.clone());
194 }
195 Some(Owner::Postgres) if source == SourceSystem::Postgres => {
196 merged
197 .get_or_insert_with(|| current.clone())
198 .insert(field.clone(), incoming_value.clone());
199 }
200 Some(Owner::Salesforce | Owner::Postgres) => {}
201 Some(Owner::Shared) | None => {
202 merged
203 .get_or_insert_with(|| current.clone())
204 .insert(field.clone(), incoming_value.clone());
205 }
206 },
207 }
208 }
209
210 let merged = merged.unwrap_or_else(|| current.clone());
211
212 if !conflicts.is_empty() {
213 return MergeOutcome::Conflict { fields: conflicts };
214 }
215
216 MergeOutcome::Merged(Value::Object(merged))
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222 use crate::identity::SyncKey;
223 use crate::model::{ChangeEnvelope, ChangeOperation, SourceSystem};
224 use chrono::Utc;
225 use serde_json::json;
226
227 fn test_object_sync() -> ObjectSync {
228 ObjectSync::new("Account")
229 }
230
231 fn test_object_sync_with_ownership() -> ObjectSync {
232 ObjectSync::new("Account")
233 .field_owner("Name", Owner::Salesforce)
234 .field_owner("Billing", Owner::Postgres)
235 .field_owner("Phone", Owner::Shared)
236 }
237
238 fn test_envelope(
239 source: SourceSystem,
240 operation: ChangeOperation,
241 payload: Value,
242 ) -> ChangeEnvelope {
243 let Ok(sync_key) = SyncKey::new("tenant", "Account", "ext-001") else {
244 panic!("failed to create test sync key");
245 };
246 ChangeEnvelope::new(sync_key, source, operation, Utc::now(), payload)
247 }
248
249 fn test_context(object: &ObjectSync, current_payload: Option<Value>) -> PlannerContext {
250 PlannerContext {
251 object: object.clone(),
252 current_payload,
253 batch_size: 1,
254 urgent: false,
255 has_dependencies: false,
256 }
257 }
258
259 #[test]
262 fn merge_payload_no_current_returns_incoming_as_merged() {
263 let object = test_object_sync();
264 let incoming = json!({"Name": "Acme"});
265
266 let result = merge_payload(&object, None, SourceSystem::Salesforce, &incoming);
267 assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Acme"})));
268 }
269
270 #[test]
271 fn merge_payload_identical_payloads_returns_noop() {
272 let object = test_object_sync();
273 let current = json!({"Name": "Acme"});
274 let incoming = json!({"Name": "Acme"});
275
276 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
277 assert_eq!(result, MergeOutcome::Noop);
278 }
279
280 #[test]
281 fn merge_payload_non_object_mismatch_returns_conflict() {
282 let object = test_object_sync();
283 let current = json!("string_a");
284 let incoming = json!("string_b");
285
286 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
287 assert!(matches!(result, MergeOutcome::Conflict { fields } if fields == vec!["<root>"]));
288 }
289
290 #[test]
291 fn merge_payload_non_object_identical_returns_noop() {
292 let object = test_object_sync();
293 let current = json!(42);
294 let incoming = json!(42);
295
296 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
297 assert_eq!(result, MergeOutcome::Noop);
298 }
299
300 #[test]
301 fn merge_payload_new_field_no_ownership_is_added() {
302 let object = test_object_sync();
303 let current = json!({"Name": "Acme"});
304 let incoming = json!({"Name": "Acme", "Phone": "555-1234"});
305
306 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
307 assert_eq!(
308 result,
309 MergeOutcome::Merged(json!({"Name": "Acme", "Phone": "555-1234"}))
310 );
311 }
312
313 #[test]
314 fn merge_payload_salesforce_owned_field_accepted_from_salesforce() {
315 let object = test_object_sync_with_ownership();
316 let current = json!({"Name": "Old"});
317 let incoming = json!({"Name": "New"});
318
319 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
320 assert_eq!(result, MergeOutcome::Merged(json!({"Name": "New"})));
321 }
322
323 #[test]
324 fn merge_payload_salesforce_owned_field_rejected_from_postgres() {
325 let object = test_object_sync_with_ownership();
326 let current = json!({"Name": "Acme"});
327 let incoming = json!({"Name": "Changed"});
328
329 let result = merge_payload(&object, Some(¤t), SourceSystem::Postgres, &incoming);
330 assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Acme"})));
331 }
332
333 #[test]
334 fn merge_payload_shared_field_conflict_on_change() {
335 let object = test_object_sync_with_ownership();
336 let current = json!({"Phone": "111"});
337 let incoming = json!({"Phone": "222"});
338
339 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
340 assert!(matches!(result, MergeOutcome::Conflict { fields } if fields == vec!["Phone"]));
341 }
342
343 #[test]
344 fn merge_payload_null_field_in_incoming_with_ownership() {
345 let object = ObjectSync::new("Account")
346 .field_owner("Name", Owner::Salesforce)
347 .field_owner("Phone", Owner::Salesforce);
348 let current = json!({"Name": "Acme", "Phone": "555"});
349 let incoming = json!({"Name": "Acme", "Phone": null});
350
351 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
352 assert_eq!(
353 result,
354 MergeOutcome::Merged(json!({"Name": "Acme", "Phone": null}))
355 );
356 }
357
358 #[test]
359 fn merge_payload_empty_incoming_object_returns_current() {
360 let object = test_object_sync();
361 let current = json!({"Name": "Acme"});
362 let incoming = json!({});
363
364 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
365 assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Acme"})));
366 }
367
368 #[test]
369 fn merge_payload_postgres_owned_field_accepted_from_postgres() {
370 let object = test_object_sync_with_ownership();
371 let current = json!({"Billing": "Old Address"});
372 let incoming = json!({"Billing": "New Address"});
373
374 let result = merge_payload(&object, Some(¤t), SourceSystem::Postgres, &incoming);
375 assert_eq!(
376 result,
377 MergeOutcome::Merged(json!({"Billing": "New Address"}))
378 );
379 }
380
381 #[test]
382 fn merge_payload_postgres_owned_field_rejected_from_salesforce() {
383 let object = test_object_sync_with_ownership();
384 let current = json!({"Billing": "Original"});
385 let incoming = json!({"Billing": "Overwritten"});
386
387 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
388 assert_eq!(result, MergeOutcome::Merged(json!({"Billing": "Original"})));
389 }
390
391 #[test]
392 fn merge_payload_new_field_salesforce_owned_added_from_salesforce() {
393 let object = test_object_sync_with_ownership();
394 let current = json!({});
395 let incoming = json!({"Name": "Added"});
396
397 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
398 assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Added"})));
399 }
400
401 #[test]
402 fn merge_payload_new_field_salesforce_owned_ignored_from_postgres() {
403 let object = test_object_sync_with_ownership();
404 let current = json!({});
405 let incoming = json!({"Name": "ShouldBeIgnored"});
406
407 let result = merge_payload(&object, Some(¤t), SourceSystem::Postgres, &incoming);
408 assert_eq!(result, MergeOutcome::Merged(json!({})));
409 }
410
411 #[test]
412 fn merge_payload_new_field_postgres_owned_ignored_from_salesforce() {
413 let object = test_object_sync_with_ownership();
414 let current = json!({});
415 let incoming = json!({"Billing": "ShouldBeIgnored"});
416
417 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
418 assert_eq!(result, MergeOutcome::Merged(json!({})));
419 }
420
421 #[test]
422 fn merge_payload_new_shared_field_is_added() {
423 let object = test_object_sync_with_ownership();
424 let current = json!({});
425 let incoming = json!({"Phone": "555-0100"});
426
427 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
428 assert_eq!(result, MergeOutcome::Merged(json!({"Phone": "555-0100"})));
429 }
430
431 #[test]
432 fn merge_payload_multiple_conflicts_collected() {
433 let object = ObjectSync::new("Account")
434 .field_owner("A", Owner::Shared)
435 .field_owner("B", Owner::Shared);
436 let current = json!({"A": 1, "B": 2});
437 let incoming = json!({"A": 10, "B": 20});
438
439 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
440 match result {
441 MergeOutcome::Conflict { fields } => {
442 assert!(fields.contains(&"A".to_string()));
443 assert!(fields.contains(&"B".to_string()));
444 assert_eq!(fields.len(), 2);
445 }
446 other => panic!("expected Conflict, got {other:?}"),
447 }
448 }
449
450 #[test]
451 fn merge_payload_null_field_in_incoming_no_ownership() {
452 let object = test_object_sync();
453 let current = json!({"Name": "Acme", "Phone": "555"});
454 let incoming = json!({"Name": "Acme", "Phone": null});
455
456 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
457 assert!(matches!(result, MergeOutcome::Conflict { fields } if fields == vec!["Phone"]));
458 }
459
460 #[test]
463 fn plan_change_delete_no_current_routes_to_rest() {
464 let object = test_object_sync();
465 let envelope = test_envelope(SourceSystem::Salesforce, ChangeOperation::Delete, json!({}));
466 let ctx = test_context(&object, None);
467
468 let decision = plan_change(&ctx, &envelope);
469 assert_eq!(decision.lane, ApplyLane::Rest);
470 }
471
472 #[test]
473 fn plan_change_identical_payload_returns_noop() {
474 let object = test_object_sync();
475 let payload = json!({"Name": "Acme"});
476 let envelope = test_envelope(
477 SourceSystem::Salesforce,
478 ChangeOperation::Upsert,
479 payload.clone(),
480 );
481 let ctx = test_context(&object, Some(payload));
482
483 let decision = plan_change(&ctx, &envelope);
484 assert_eq!(decision.lane, ApplyLane::Noop);
485 assert!(decision.payload.is_none());
486 assert!(decision.conflicts.is_empty());
487 }
488
489 #[test]
490 fn plan_change_new_record_routes_to_rest() {
491 let object = test_object_sync();
492 let envelope = test_envelope(
493 SourceSystem::Postgres,
494 ChangeOperation::Upsert,
495 json!({"Name": "New"}),
496 );
497 let ctx = test_context(&object, None);
498
499 let decision = plan_change(&ctx, &envelope);
500 assert_eq!(decision.lane, ApplyLane::Rest);
501 assert_eq!(decision.payload, Some(json!({"Name": "New"})));
502 }
503
504 #[test]
505 fn plan_change_urgent_routes_to_rest() {
506 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
507 let envelope = test_envelope(
508 SourceSystem::Postgres,
509 ChangeOperation::Upsert,
510 json!({"Name": "Updated"}),
511 );
512 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
513 ctx.urgent = true;
514
515 let decision = plan_change(&ctx, &envelope);
516 assert_eq!(decision.lane, ApplyLane::Rest);
517 }
518
519 #[test]
520 fn plan_change_with_dependencies_routes_to_composite_graph() {
521 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
522 let envelope = test_envelope(
523 SourceSystem::Postgres,
524 ChangeOperation::Upsert,
525 json!({"Name": "Updated"}),
526 );
527 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
528 ctx.has_dependencies = true;
529
530 let decision = plan_change(&ctx, &envelope);
531 assert_eq!(decision.lane, ApplyLane::CompositeGraph);
532 }
533
534 #[test]
535 fn plan_change_large_batch_routes_to_bulk() {
536 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
537 let envelope = test_envelope(
538 SourceSystem::Postgres,
539 ChangeOperation::Upsert,
540 json!({"Name": "Updated"}),
541 );
542 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
543 ctx.batch_size = 1000;
544
545 let decision = plan_change(&ctx, &envelope);
546 assert_eq!(decision.lane, ApplyLane::Bulk);
547 }
548
549 #[test]
550 fn plan_change_conflict_returns_conflict_lane() {
551 let object = test_object_sync_with_ownership();
552 let envelope = test_envelope(
553 SourceSystem::Salesforce,
554 ChangeOperation::Upsert,
555 json!({"Phone": "new-value"}),
556 );
557 let ctx = test_context(&object, Some(json!({"Phone": "old-value"})));
558
559 let decision = plan_change(&ctx, &envelope);
560 assert_eq!(decision.lane, ApplyLane::Conflict);
561 assert!(decision.payload.is_none());
562 assert!(!decision.conflicts.is_empty());
563 }
564
565 #[test]
566 fn plan_change_merged_equals_current_returns_noop() {
567 let object = test_object_sync_with_ownership();
568 let current = json!({"Name": "Acme", "Billing": "123 Main"});
569 let incoming = json!({"Name": "Changed"});
570 let envelope = test_envelope(SourceSystem::Postgres, ChangeOperation::Upsert, incoming);
571 let ctx = test_context(&object, Some(current));
572
573 let decision = plan_change(&ctx, &envelope);
574 assert_eq!(decision.lane, ApplyLane::Noop);
575 assert!(decision.payload.is_none());
576 }
577
578 #[test]
579 fn plan_change_batch_below_bulk_threshold_routes_to_rest() {
580 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
581 let envelope = test_envelope(
582 SourceSystem::Postgres,
583 ChangeOperation::Upsert,
584 json!({"Name": "Updated"}),
585 );
586 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
587 ctx.batch_size = 499;
588
589 let decision = plan_change(&ctx, &envelope);
590 assert_eq!(decision.lane, ApplyLane::Rest);
591 }
592
593 #[test]
594 fn plan_change_batch_at_bulk_threshold_routes_to_bulk() {
595 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
596 let envelope = test_envelope(
597 SourceSystem::Postgres,
598 ChangeOperation::Upsert,
599 json!({"Name": "Updated"}),
600 );
601 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
602 ctx.batch_size = 500;
603
604 let decision = plan_change(&ctx, &envelope);
605 assert_eq!(decision.lane, ApplyLane::Bulk);
606 }
607
608 #[test]
609 fn plan_change_dependencies_take_priority_over_bulk() {
610 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
611 let envelope = test_envelope(
612 SourceSystem::Postgres,
613 ChangeOperation::Upsert,
614 json!({"Name": "Updated"}),
615 );
616 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
617 ctx.batch_size = 1000;
618 ctx.has_dependencies = true;
619
620 let decision = plan_change(&ctx, &envelope);
621 assert_eq!(decision.lane, ApplyLane::CompositeGraph);
622 }
623
624 #[test]
625 fn plan_change_dependencies_take_priority_over_urgent() {
626 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
627 let envelope = test_envelope(
628 SourceSystem::Postgres,
629 ChangeOperation::Upsert,
630 json!({"Name": "Updated"}),
631 );
632 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
633 ctx.urgent = true;
634 ctx.has_dependencies = true;
635
636 let decision = plan_change(&ctx, &envelope);
637 assert_eq!(decision.lane, ApplyLane::CompositeGraph);
638 }
639
640 #[test]
641 fn plan_change_urgent_takes_priority_over_bulk() {
642 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
643 let envelope = test_envelope(
644 SourceSystem::Postgres,
645 ChangeOperation::Upsert,
646 json!({"Name": "Updated"}),
647 );
648 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
649 ctx.batch_size = 1000;
650 ctx.urgent = true;
651
652 let decision = plan_change(&ctx, &envelope);
653 assert_eq!(decision.lane, ApplyLane::Rest);
654 }
655
656 #[test]
657 fn plan_change_semantically_equal_payload_returns_noop() {
658 let object = test_object_sync();
659 let current = json!({"A": 1, "B": 2});
660 let incoming = json!({"B": 2, "A": 1});
661 let envelope = test_envelope(SourceSystem::Postgres, ChangeOperation::Upsert, incoming);
662 let ctx = test_context(&object, Some(current));
663
664 let decision = plan_change(&ctx, &envelope);
665 assert_eq!(decision.lane, ApplyLane::Noop);
666 }
667}