Skip to main content

sinks_core/
sink.rs

1use async_trait::async_trait;
2use schema_core::{GenericValue, IndexMapping, IndexName};
3
4use crate::Result;
5
6/// The outcome of a [`flush`](Sink::flush): which buffered documents the
7/// destination **rejected at the item level**.
8///
9/// This is distinct from a flush returning `Err`. An `Err` is a flush-wide
10/// failure (transport down, the whole request refused) — nothing in the batch
11/// is known durable, so the engine stops and the batch is redelivered. A
12/// `FlushReport` instead means the flush *succeeded* and the destination applied
13/// the batch, but rejected specific documents (a mapping conflict, a malformed
14/// value) while accepting the rest. Those rejections are the document's fault,
15/// not the destination's, so retrying redelivers the same poison — the engine
16/// handles them per its failure policy (stop, or quarantine and continue)
17/// instead of looping. An empty report means everything flushed cleanly.
18#[derive(Debug, Clone, Default)]
19pub struct FlushReport {
20    /// Documents the destination accepted the batch but rejected individually.
21    pub rejected: Vec<RejectedDocument>,
22}
23
24impl FlushReport {
25    /// A report with no rejections — everything in the flush was applied.
26    pub fn clean() -> Self {
27        Self::default()
28    }
29
30    /// Whether the flush applied every buffered document (no item-level
31    /// rejections).
32    pub fn is_clean(&self) -> bool {
33        self.rejected.is_empty()
34    }
35}
36
37/// One document a sink rejected at the item level during a [`flush`](Sink::flush).
38/// The names are the destination's own (e.g. an OpenSearch physical index), for
39/// diagnostics and quarantine records.
40#[derive(Debug, Clone)]
41pub struct RejectedDocument {
42    /// The destination index the document was bound for.
43    pub index: String,
44    /// The document's id within that index (the search engine's `_id`).
45    pub id: String,
46    /// Why the destination rejected it.
47    pub reason: String,
48}
49
50/// A destination for assembled documents.
51///
52/// The engine calls [`upsert`](Self::upsert) / [`delete`](Self::delete) as it
53/// processes changes, then [`flush`](Self::flush) at a commit or batch boundary
54/// — so a buffering sink (e.g. OpenSearch bulk) can hold writes until then,
55/// while a streaming sink can write immediately and flush cheaply.
56///
57/// `id` is the document's identifier within the index (the search engine's
58/// `_id`); the engine derives it from the document's key.
59#[async_trait]
60pub trait Sink: std::fmt::Debug + Send + Sync {
61    /// Ensure the destination index exists, creating it from `mapping` if it is
62    /// absent. The engine calls this once per index at startup, before any
63    /// writes, so a sink that owns its index can pin field types up front
64    /// instead of letting the destination guess them. The default is a no-op —
65    /// correct for sinks with no schema-bound index (e.g. stdout).
66    async fn ensure_index(&self, _mapping: &IndexMapping) -> Result<()> {
67        Ok(())
68    }
69
70    /// Index (insert or replace) `document` under `id` in `index`.
71    async fn upsert(&self, index: &IndexName, id: &str, document: &GenericValue) -> Result<()>;
72
73    /// Remove the document `id` from `index`.
74    async fn delete(&self, index: &IndexName, id: &str) -> Result<()>;
75
76    /// Flush any buffered writes so everything written so far is durable.
77    ///
78    /// `caught_up` tells the sink the engine has drained the queue with this
79    /// batch — there is no backlog waiting behind it. A sink whose destination
80    /// has a cost to making writes *visible* (distinct from durable) can use
81    /// this to take that cost only when it's cheap: do it on a caught-up flush
82    /// (the pipeline is idle), skip it while a backlog is draining. Sinks with
83    /// no such distinction ignore it. See the OpenSearch sink, which forces an
84    /// index refresh only when `caught_up`.
85    ///
86    /// Returns a [`FlushReport`]: `Ok` with an empty report means every buffered
87    /// document was applied; `Ok` with rejections means the flush succeeded but
88    /// the destination refused specific documents (see [`FlushReport`] for why
89    /// that differs from an `Err`). `Err` is a flush-wide failure.
90    async fn flush(&self, caught_up: bool) -> Result<FlushReport>;
91
92    /// Whether `index` has already been seeded — its initial backfill completed
93    /// and durably applied here. The engine asks this at startup and skips the
94    /// backfill for indexes that report `true`.
95    ///
96    /// Seeded-state is destination knowledge, so it belongs to the sink: only
97    /// the sink knows whether its target already holds the data. The default is
98    /// `false` (never seeded) — correct for sinks that can't persist this, which
99    /// then re-seed on every run. Sinks that can store it (a metadata document,
100    /// a row, a sidecar) should override both methods.
101    async fn is_seeded(&self, _: &IndexName) -> Result<bool> {
102        Ok(false)
103    }
104
105    /// Record that `index` has been seeded, so a later run skips its backfill.
106    /// The default is a no-op (paired with `is_seeded` returning `false`).
107    async fn mark_seeded(&self, _: &IndexName) -> Result<()> {
108        Ok(())
109    }
110
111    /// Request a from-scratch rebuild of `index` on the next backfill: mark it
112    /// unseeded so [`is_seeded`](Self::is_seeded) reports `false` again, *without*
113    /// disturbing what currently serves reads. A sink that builds into a fresh,
114    /// swappable target (e.g. the OpenSearch sink's per-generation indexes behind
115    /// a stable alias) prepares that target here so the seeding path rebuilds it
116    /// and atomically swaps on completion — the live copy is untouched until then.
117    ///
118    /// This only flips the seeded-state and stages the target; the actual reseed
119    /// runs through the normal [`ensure_index`](Self::ensure_index) → backfill →
120    /// [`mark_seeded`](Self::mark_seeded) path on the next run. The default is a
121    /// no-op (correct for sinks that re-seed every run anyway). Takes the full
122    /// [`IndexMapping`] (not just the name) so a freshly-built sink can stage the
123    /// reindex without having run [`ensure_index`](Self::ensure_index) — it needs
124    /// the schema hash to address the index, not the running engine's state.
125    async fn reindex(&self, _: &IndexMapping) -> Result<()> {
126        Ok(())
127    }
128}