rustio_core/search/
indexer.rs1use std::sync::Arc;
15use std::time::Duration;
16
17use serde_json::Value;
18use tokio::sync::mpsc;
19use tokio::time::{interval, MissedTickBehavior};
20
21use crate::error::Result;
22
23use super::client::MeiliClient;
24
25#[derive(Debug, Clone)]
27pub enum IndexJob {
28 Upsert {
29 index: String,
30 primary_key: String,
31 document: Value,
32 },
33 Delete {
34 index: String,
35 id: String,
36 },
37}
38
39#[derive(Clone)]
41pub struct Indexer {
42 tx: mpsc::Sender<IndexJob>,
43}
44
45impl Indexer {
46 pub fn spawn(client: Arc<MeiliClient>, capacity: usize) -> Self {
49 let (tx, rx) = mpsc::channel(capacity);
50 tokio::spawn(run_worker(client, rx));
51 Self { tx }
52 }
53
54 pub async fn queue(&self, job: IndexJob) -> Result<()> {
56 self.tx
57 .send(job)
58 .await
59 .map_err(|e| crate::error::Error::Internal(format!("indexer queue: {e}")))
60 }
61
62 pub fn queue_detached(&self, job: IndexJob) {
65 let tx = self.tx.clone();
66 tokio::spawn(async move {
67 if let Err(e) = tx.send(job).await {
68 log::warn!("indexer queue dropped: {e}");
69 }
70 });
71 }
72}
73
74async fn run_worker(client: Arc<MeiliClient>, mut rx: mpsc::Receiver<IndexJob>) {
75 let mut batch: Vec<IndexJob> = Vec::with_capacity(512);
80 let mut tick = interval(Duration::from_millis(100));
81 tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
82
83 loop {
84 tokio::select! {
85 maybe_job = rx.recv() => {
86 match maybe_job {
87 Some(job) => {
88 batch.push(job);
89 while batch.len() < 500 {
91 match rx.try_recv() {
92 Ok(more) => batch.push(more),
93 Err(_) => break,
94 }
95 }
96 }
97 None => {
98 flush(&client, &mut batch).await;
100 break;
101 }
102 }
103 }
104 _ = tick.tick() => {
105 if !batch.is_empty() {
106 flush(&client, &mut batch).await;
107 }
108 }
109 }
110 }
111}
112
113async fn flush(client: &MeiliClient, batch: &mut Vec<IndexJob>) {
114 if batch.is_empty() {
115 return;
116 }
117 let mut upserts: std::collections::HashMap<(String, String), Vec<Value>> =
119 std::collections::HashMap::new();
120 let mut deletes: Vec<(String, String)> = Vec::new();
121
122 for job in batch.drain(..) {
123 match job {
124 IndexJob::Upsert { index, primary_key, document } => {
125 upserts.entry((index, primary_key)).or_default().push(document);
126 }
127 IndexJob::Delete { index, id } => {
128 deletes.push((index, id));
129 }
130 }
131 }
132
133 for ((index, primary_key), docs) in upserts {
134 let count = docs.len();
135 if let Err(e) = client.add_documents(&index, &docs, &primary_key).await {
136 log::warn!("indexer upsert failed ({index}, {count} docs): {e}");
137 } else {
138 log::debug!("indexer upserted {count} docs into {index}");
139 }
140 }
141
142 for (index, id) in deletes {
143 if let Err(e) = client.delete_document(&index, &id).await {
144 log::warn!("indexer delete failed ({index}, {id}): {e}");
145 }
146 }
147}