sinks_core/sink.rs
1use async_trait::async_trait;
2use schema_core::{GenericValue, IndexMapping, IndexName};
3
4use crate::Result;
5
6/// The outcome of a [`flush`](Sink::flush): which buffered documents the
7/// destination **rejected at the item level**.
8///
9/// This is distinct from a flush returning `Err`. An `Err` is a flush-wide
10/// failure (transport down, the whole request refused) — nothing in the batch
11/// is known durable, so the engine stops and the batch is redelivered. A
12/// `FlushReport` instead means the flush *succeeded* and the destination applied
13/// the batch, but rejected specific documents (a mapping conflict, a malformed
14/// value) while accepting the rest. Those rejections are the document's fault,
15/// not the destination's, so retrying redelivers the same poison — the engine
16/// handles them per its failure policy (stop, or quarantine and continue)
17/// instead of looping. An empty report means everything flushed cleanly.
18#[derive(Debug, Clone, Default)]
19pub struct FlushReport {
20 /// Documents the destination accepted the batch but rejected individually.
21 pub rejected: Vec<RejectedDocument>,
22}
23
24impl FlushReport {
25 /// A report with no rejections — everything in the flush was applied.
26 pub fn clean() -> Self {
27 Self::default()
28 }
29
30 /// Whether the flush applied every buffered document (no item-level
31 /// rejections).
32 pub fn is_clean(&self) -> bool {
33 self.rejected.is_empty()
34 }
35}
36
37/// One document a sink rejected at the item level during a [`flush`](Sink::flush).
38/// The names are the destination's own (e.g. an OpenSearch physical index), for
39/// diagnostics and quarantine records.
40#[derive(Debug, Clone)]
41pub struct RejectedDocument {
42 /// The destination index the document was bound for.
43 pub index: String,
44 /// The document's id within that index (the search engine's `_id`).
45 pub id: String,
46 /// Why the destination rejected it.
47 pub reason: String,
48}
49
50/// A destination for assembled documents.
51///
52/// The engine calls [`upsert`](Self::upsert) / [`delete`](Self::delete) as it
53/// processes changes, then [`flush`](Self::flush) at a commit or batch boundary
54/// — so a buffering sink (e.g. OpenSearch bulk) can hold writes until then,
55/// while a streaming sink can write immediately and flush cheaply.
56///
57/// `id` is the document's identifier within the index (the search engine's
58/// `_id`); the engine derives it from the document's key.
59#[async_trait]
60pub trait Sink: std::fmt::Debug + Send + Sync {
61 /// Ensure the destination index exists, creating it from `mapping` if it is
62 /// absent. The engine calls this once per index at startup, before any
63 /// writes, so a sink that owns its index can pin field types up front
64 /// instead of letting the destination guess them. The default is a no-op —
65 /// correct for sinks with no schema-bound index (e.g. stdout).
66 async fn ensure_index(&self, _mapping: &IndexMapping) -> Result<()> {
67 Ok(())
68 }
69
70 /// Index (insert or replace) `document` under `id` in `index`.
71 async fn upsert(&self, index: &IndexName, id: &str, document: &GenericValue) -> Result<()>;
72
73 /// Remove the document `id` from `index`.
74 async fn delete(&self, index: &IndexName, id: &str) -> Result<()>;
75
76 /// Flush any buffered writes so everything written so far is durable.
77 ///
78 /// `caught_up` tells the sink the engine has drained the queue with this
79 /// batch — there is no backlog waiting behind it. A sink whose destination
80 /// has a cost to making writes *visible* (distinct from durable) can use
81 /// this to take that cost only when it's cheap: do it on a caught-up flush
82 /// (the pipeline is idle), skip it while a backlog is draining. Sinks with
83 /// no such distinction ignore it. See the OpenSearch sink, which forces an
84 /// index refresh only when `caught_up`.
85 ///
86 /// Returns a [`FlushReport`]: `Ok` with an empty report means every buffered
87 /// document was applied; `Ok` with rejections means the flush succeeded but
88 /// the destination refused specific documents (see [`FlushReport`] for why
89 /// that differs from an `Err`). `Err` is a flush-wide failure.
90 async fn flush(&self, caught_up: bool) -> Result<FlushReport>;
91
92 /// Whether `index` has already been seeded — its initial backfill completed
93 /// and durably applied here. The engine asks this at startup and skips the
94 /// backfill for indexes that report `true`.
95 ///
96 /// Seeded-state is destination knowledge, so it belongs to the sink: only
97 /// the sink knows whether its target already holds the data. The default is
98 /// `false` (never seeded) — correct for sinks that can't persist this, which
99 /// then re-seed on every run. Sinks that can store it (a metadata document,
100 /// a row, a sidecar) should override both methods.
101 async fn is_seeded(&self, _: &IndexName) -> Result<bool> {
102 Ok(false)
103 }
104
105 /// Record that `index` has been seeded, so a later run skips its backfill.
106 /// The default is a no-op (paired with `is_seeded` returning `false`).
107 async fn mark_seeded(&self, _: &IndexName) -> Result<()> {
108 Ok(())
109 }
110
111 /// Request a from-scratch rebuild of `index` on the next backfill: mark it
112 /// unseeded so [`is_seeded`](Self::is_seeded) reports `false` again, *without*
113 /// disturbing what currently serves reads. A sink that builds into a fresh,
114 /// swappable target (e.g. the OpenSearch sink's per-generation indexes behind
115 /// a stable alias) prepares that target here so the seeding path rebuilds it
116 /// and atomically swaps on completion — the live copy is untouched until then.
117 ///
118 /// This only flips the seeded-state and stages the target; the actual reseed
119 /// runs through the normal [`ensure_index`](Self::ensure_index) → backfill →
120 /// [`mark_seeded`](Self::mark_seeded) path on the next run. The default is a
121 /// no-op (correct for sinks that re-seed every run anyway). Takes the full
122 /// [`IndexMapping`] (not just the name) so a freshly-built sink can stage the
123 /// reindex without having run [`ensure_index`](Self::ensure_index) — it needs
124 /// the schema hash to address the index, not the running engine's state.
125 async fn reindex(&self, _: &IndexMapping) -> Result<()> {
126 Ok(())
127 }
128}