use tapped::{Event, EventReceiver};
use crate::error::IndexerError;
use crate::event::IndexerAction;
use crate::stream::{EventStream, RawEvent};
pub struct TappedEventStream {
receiver: EventReceiver,
}
impl TappedEventStream {
#[must_use]
pub const fn new(receiver: EventReceiver) -> Self {
Self { receiver }
}
}
impl EventStream for TappedEventStream {
async fn next_event(&mut self) -> Result<Option<RawEvent>, IndexerError> {
loop {
let received = match self.receiver.recv().await {
Ok(received) => received,
Err(err) => {
let msg = err.to_string();
if msg.to_ascii_lowercase().contains("closed") {
return Ok(None);
}
return Err(IndexerError::Stream(msg));
}
};
let Event::Record(record) = &received.event else {
continue;
};
let action = match record.action {
tapped::RecordAction::Create => IndexerAction::Create,
tapped::RecordAction::Update => IndexerAction::Update,
tapped::RecordAction::Delete => IndexerAction::Delete,
_ => {
return Err(IndexerError::Stream(format!(
"unrecognized tapped action on {}/{}/{}",
record.did, record.collection, record.rkey,
)));
}
};
let body = match record.record_as_str() {
Some(raw) => Some(
serde_json::from_str::<serde_json::Value>(raw)
.map_err(|e| IndexerError::Stream(e.to_string()))?,
),
None => None,
};
return Ok(Some(RawEvent {
seq: record.id,
live: record.live,
did: record.did.clone(),
rev: record.rev.clone(),
collection: record.collection.clone(),
rkey: record.rkey.clone(),
action,
cid: record.cid.clone(),
body,
}));
}
}
}