engine/policy.rs
1//! Configuration vocabulary for a run: how changes are grouped into a flush
2//! ([`BatchPolicy`]) and how item-level sink rejections are handled
3//! ([`FailurePolicies`] over [`FailurePolicy`]).
4
5use std::collections::HashMap;
6use std::time::Duration;
7
8pub use schema_core::FailurePolicy;
9
10/// How the worker groups changes into one sink flush.
11///
12/// Batching trades a little latency for far fewer round-trips: up to
13/// `max_changes` changes (or whatever has arrived after `max_delay`, whichever
14/// comes first) are buffered and flushed together. `max_changes: 1` reproduces
15/// the original flush-per-change behavior.
16///
17/// Acks respect the batch boundary — see the [module docs](crate). The source
18/// ack for a change is confirmed only after the flush that made its documents
19/// durable, so at-least-once delivery holds regardless of batch size.
20#[derive(Debug, Clone, Copy)]
21pub struct BatchPolicy {
22 /// Flush once this many changes have accumulated. Clamped to at least 1.
23 pub max_changes: usize,
24 /// Flush a partial batch this long after its first change, so a trickle of
25 /// changes still lands promptly instead of waiting for a full batch.
26 pub max_delay: Duration,
27}
28
29impl Default for BatchPolicy {
30 fn default() -> Self {
31 Self {
32 max_changes: 256,
33 max_delay: Duration::from_millis(50),
34 }
35 }
36}
37
38/// How the engine resolves the [`FailurePolicy`] for a rejected document: a
39/// global `default` plus per-index overrides, keyed by **logical** index name.
40///
41/// The engine governs only *item-level rejections* (a sink accepted the batch
42/// but refused specific documents). Transport failures, a source decode error,
43/// or a flush returning `Err` always stop the run regardless of this.
44#[derive(Debug, Clone, Default)]
45pub struct FailurePolicies {
46 default: FailurePolicy,
47 overrides: HashMap<String, FailurePolicy>,
48}
49
50impl FailurePolicies {
51 /// A policy set with `default` applied to every index and no overrides.
52 pub fn new(default: FailurePolicy) -> Self {
53 Self {
54 default,
55 overrides: HashMap::new(),
56 }
57 }
58
59 /// Override the policy for one logical index, leaving others on the default.
60 pub fn with_override(mut self, index: impl Into<String>, policy: FailurePolicy) -> Self {
61 self.overrides.insert(index.into(), policy);
62 self
63 }
64
65 /// The effective policy for `index` (its override, else the default).
66 pub fn resolve(&self, index: &str) -> FailurePolicy {
67 self.overrides.get(index).copied().unwrap_or(self.default)
68 }
69}