use idiolect_records::{IdiolectFamily, RecordFamily};
use crate::cursor::CursorStore;
use crate::error::IndexerError;
use crate::event::{IndexerAction, IndexerEvent};
use crate::handler::RecordHandler;
use crate::stream::{EventStream, RawEvent};
#[derive(Debug, Clone)]
pub struct IndexerConfig {
pub subscription_id: String,
}
impl Default for IndexerConfig {
fn default() -> Self {
Self {
subscription_id: "idiolect-indexer".to_owned(),
}
}
}
pub async fn drive_indexer<F, S, H, C>(
stream: &mut S,
handler: &H,
cursor_store: &C,
config: &IndexerConfig,
) -> Result<(), IndexerError>
where
F: RecordFamily,
S: EventStream,
H: RecordHandler<F>,
C: CursorStore,
{
while let Some(raw) = stream.next_event().await? {
process_event::<F, _, _>(raw, handler, cursor_store, config).await?;
}
Ok(())
}
pub async fn drive_idiolect_indexer<S, H, C>(
stream: &mut S,
handler: &H,
cursor_store: &C,
config: &IndexerConfig,
) -> Result<(), IndexerError>
where
S: EventStream,
H: RecordHandler<IdiolectFamily>,
C: CursorStore,
{
drive_indexer::<IdiolectFamily, _, _, _>(stream, handler, cursor_store, config).await
}
async fn process_event<F, H, C>(
raw: RawEvent,
handler: &H,
cursor_store: &C,
config: &IndexerConfig,
) -> Result<(), IndexerError>
where
F: RecordFamily,
H: RecordHandler<F>,
C: CursorStore,
{
if !F::contains(&raw.collection) {
return Ok(());
}
let record: Option<F::AnyRecord> = match (raw.action, &raw.body) {
(IndexerAction::Delete, _) => None,
(_, Some(body)) => match F::decode(&raw.collection, body.clone()) {
Ok(Some(decoded)) => Some(decoded),
Ok(None) => {
return Err(IndexerError::FamilyContract(raw.collection.to_string()));
}
Err(e) => return Err(IndexerError::Decode(e)),
},
(_, None) => {
return Err(IndexerError::MissingBody(format!(
"{}/{}/{}",
raw.did, raw.collection, raw.rkey,
)));
}
};
let event = IndexerEvent::<F> {
seq: raw.seq,
live: raw.live,
did: raw.did,
rev: raw.rev,
rkey: raw.rkey,
collection: raw.collection,
action: raw.action,
cid: raw.cid,
record,
};
handler.handle(&event).await?;
if event.live {
cursor_store
.commit(&config.subscription_id, event.seq)
.await?;
}
Ok(())
}