use std::sync::Arc;
use std::time::Duration;
use serde_json::Value;
use tokio::sync::mpsc;
use tokio::time::{interval, MissedTickBehavior};
use crate::error::Result;
use super::client::MeiliClient;
#[derive(Debug, Clone)]
pub enum IndexJob {
Upsert {
index: String,
primary_key: String,
document: Value,
},
Delete {
index: String,
id: String,
},
}
#[derive(Clone)]
pub struct Indexer {
tx: mpsc::Sender<IndexJob>,
}
impl Indexer {
pub fn spawn(client: Arc<MeiliClient>, capacity: usize) -> Self {
let (tx, rx) = mpsc::channel(capacity);
tokio::spawn(run_worker(client, rx));
Self { tx }
}
pub async fn queue(&self, job: IndexJob) -> Result<()> {
self.tx
.send(job)
.await
.map_err(|e| crate::error::Error::Internal(format!("indexer queue: {e}")))
}
pub fn queue_detached(&self, job: IndexJob) {
let tx = self.tx.clone();
tokio::spawn(async move {
if let Err(e) = tx.send(job).await {
log::warn!("indexer queue dropped: {e}");
}
});
}
}
async fn run_worker(client: Arc<MeiliClient>, mut rx: mpsc::Receiver<IndexJob>) {
let mut batch: Vec<IndexJob> = Vec::with_capacity(512);
let mut tick = interval(Duration::from_millis(100));
tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
maybe_job = rx.recv() => {
match maybe_job {
Some(job) => {
batch.push(job);
while batch.len() < 500 {
match rx.try_recv() {
Ok(more) => batch.push(more),
Err(_) => break,
}
}
}
None => {
flush(&client, &mut batch).await;
break;
}
}
}
_ = tick.tick() => {
if !batch.is_empty() {
flush(&client, &mut batch).await;
}
}
}
}
}
async fn flush(client: &MeiliClient, batch: &mut Vec<IndexJob>) {
if batch.is_empty() {
return;
}
let mut upserts: std::collections::HashMap<(String, String), Vec<Value>> =
std::collections::HashMap::new();
let mut deletes: Vec<(String, String)> = Vec::new();
for job in batch.drain(..) {
match job {
IndexJob::Upsert { index, primary_key, document } => {
upserts.entry((index, primary_key)).or_default().push(document);
}
IndexJob::Delete { index, id } => {
deletes.push((index, id));
}
}
}
for ((index, primary_key), docs) in upserts {
let count = docs.len();
if let Err(e) = client.add_documents(&index, &docs, &primary_key).await {
log::warn!("indexer upsert failed ({index}, {count} docs): {e}");
} else {
log::debug!("indexer upserted {count} docs into {index}");
}
}
for (index, id) in deletes {
if let Err(e) = client.delete_document(&index, &id).await {
log::warn!("indexer delete failed ({index}, {id}): {e}");
}
}
}