Skip to main content

rustio_core/search/
indexer.rs

1//! Async indexer. Writes arrive on a bounded channel; a background
2//! task drains and bulk-indexes them.
3//!
4//! Why batching: Meilisearch takes ~1ms to index a single document and
5//! ~5ms to index a batch of 1000. If you POST every row the moment it's
6//! created, a busy write endpoint gets crushed by network RTT. Batching
7//! brings the per-doc cost down to microseconds.
8//!
9//! Backpressure: the channel has a fixed capacity (default 1024). If
10//! the indexer falls behind, `queue()` blocks. That's intentional — it
11//! means the write path can't scribble data into the pipeline faster
12//! than it can be indexed.
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use serde_json::Value;
18use tokio::sync::mpsc;
19use tokio::time::{interval, MissedTickBehavior};
20
21use crate::error::Result;
22
23use super::client::MeiliClient;
24
25/// A single write to propagate to the search index.
26#[derive(Debug, Clone)]
27pub enum IndexJob {
28    Upsert {
29        index: String,
30        primary_key: String,
31        document: Value,
32    },
33    Delete {
34        index: String,
35        id: String,
36    },
37}
38
39/// Handle to the background indexer. Cheap to clone.
40#[derive(Clone)]
41pub struct Indexer {
42    tx: mpsc::Sender<IndexJob>,
43}
44
45impl Indexer {
46    /// Spawn the worker. Returns an `Indexer` you clone into anywhere
47    /// that writes data (handlers, background tasks, migrations).
48    pub fn spawn(client: Arc<MeiliClient>, capacity: usize) -> Self {
49        let (tx, rx) = mpsc::channel(capacity);
50        tokio::spawn(run_worker(client, rx));
51        Self { tx }
52    }
53
54    /// Queue a job. Blocks if the channel is full.
55    pub async fn queue(&self, job: IndexJob) -> Result<()> {
56        self.tx
57            .send(job)
58            .await
59            .map_err(|e| crate::error::Error::Internal(format!("indexer queue: {e}")))
60    }
61
62    /// Fire-and-forget version. Used by the admin writes — we don't
63    /// want a stalled indexer to turn a user-visible POST into an error.
64    pub fn queue_detached(&self, job: IndexJob) {
65        let tx = self.tx.clone();
66        tokio::spawn(async move {
67            if let Err(e) = tx.send(job).await {
68                log::warn!("indexer queue dropped: {e}");
69            }
70        });
71    }
72}
73
74async fn run_worker(client: Arc<MeiliClient>, mut rx: mpsc::Receiver<IndexJob>) {
75    // Batching window: drain up to 500 pending jobs or 100ms, whichever
76    // comes first. Good balance between responsiveness (someone adds a
77    // post and wants it searchable "right now") and throughput (bulk
78    // index beats single-doc index by ~10×).
79    let mut batch: Vec<IndexJob> = Vec::with_capacity(512);
80    let mut tick = interval(Duration::from_millis(100));
81    tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
82
83    loop {
84        tokio::select! {
85            maybe_job = rx.recv() => {
86                match maybe_job {
87                    Some(job) => {
88                        batch.push(job);
89                        // Greedy drain — take whatever else is immediately ready.
90                        while batch.len() < 500 {
91                            match rx.try_recv() {
92                                Ok(more) => batch.push(more),
93                                Err(_) => break,
94                            }
95                        }
96                    }
97                    None => {
98                        // Channel closed; flush and exit.
99                        flush(&client, &mut batch).await;
100                        break;
101                    }
102                }
103            }
104            _ = tick.tick() => {
105                if !batch.is_empty() {
106                    flush(&client, &mut batch).await;
107                }
108            }
109        }
110    }
111}
112
113async fn flush(client: &MeiliClient, batch: &mut Vec<IndexJob>) {
114    if batch.is_empty() {
115        return;
116    }
117    // Group by (index, op). Upserts can go as bulk; deletes are per-id.
118    let mut upserts: std::collections::HashMap<(String, String), Vec<Value>> =
119        std::collections::HashMap::new();
120    let mut deletes: Vec<(String, String)> = Vec::new();
121
122    for job in batch.drain(..) {
123        match job {
124            IndexJob::Upsert { index, primary_key, document } => {
125                upserts.entry((index, primary_key)).or_default().push(document);
126            }
127            IndexJob::Delete { index, id } => {
128                deletes.push((index, id));
129            }
130        }
131    }
132
133    for ((index, primary_key), docs) in upserts {
134        let count = docs.len();
135        if let Err(e) = client.add_documents(&index, &docs, &primary_key).await {
136            log::warn!("indexer upsert failed ({index}, {count} docs): {e}");
137        } else {
138            log::debug!("indexer upserted {count} docs into {index}");
139        }
140    }
141
142    for (index, id) in deletes {
143        if let Err(e) = client.delete_document(&index, &id).await {
144            log::warn!("indexer delete failed ({index}, {id}): {e}");
145        }
146    }
147}