use std::collections::VecDeque;
use futures_util::StreamExt;
use serde::Deserialize;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
use crate::error::IndexerError;
use crate::event::IndexerAction;
use crate::stream::{EventStream, RawEvent};
#[derive(Debug, Deserialize)]
struct JetstreamFrame {
did: String,
#[serde(default)]
time_us: u64,
kind: String,
#[serde(default)]
commit: Option<JetstreamCommit>,
}
#[derive(Debug, Deserialize)]
struct JetstreamCommit {
rev: String,
operation: String,
collection: String,
rkey: String,
#[serde(default)]
record: Option<serde_json::Value>,
#[serde(default)]
cid: Option<String>,
}
pub fn parse_frame(line: &str) -> Result<Option<RawEvent>, IndexerError> {
let frame: JetstreamFrame = serde_json::from_str(line)
.map_err(|e| IndexerError::Stream(format!("jetstream frame parse: {e}")))?;
if frame.kind != "commit" {
return Ok(None);
}
let Some(commit) = frame.commit else {
return Ok(None);
};
let action = match commit.operation.as_str() {
"create" => IndexerAction::Create,
"update" => IndexerAction::Update,
"delete" => IndexerAction::Delete,
other => {
return Err(IndexerError::Stream(format!(
"unknown jetstream commit operation {other:?} on {}/{}/{}",
frame.did, commit.collection, commit.rkey,
)));
}
};
Ok(Some(RawEvent {
seq: frame.time_us,
live: true,
did: frame.did,
rev: commit.rev,
collection: commit.collection,
rkey: commit.rkey,
action,
cid: commit.cid,
body: commit.record,
}))
}
pub struct JetstreamEventStream {
source: JetstreamSource,
buffered: VecDeque<RawEvent>,
keepalive_interval: std::time::Duration,
}
type LiveWriter = futures_util::stream::SplitSink<
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
Message,
>;
type LiveReader = futures_util::stream::SplitStream<
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
>;
enum JetstreamSource {
Socket {
writer: LiveWriter,
reader: LiveReader,
},
Lines(VecDeque<String>),
}
impl JetstreamEventStream {
pub async fn connect(url: &str) -> Result<Self, IndexerError> {
let parsed = url::Url::parse(url)
.map_err(|e| IndexerError::Stream(format!("jetstream url {url}: {e}")))?;
let (ws, _resp) = connect_async(parsed.as_str())
.await
.map_err(|e| IndexerError::Stream(format!("jetstream connect: {e}")))?;
let (writer, reader) = ws.split();
Ok(Self {
source: JetstreamSource::Socket { writer, reader },
buffered: VecDeque::new(),
keepalive_interval: std::time::Duration::from_secs(30),
})
}
#[must_use]
pub fn with_keepalive_interval(mut self, interval: std::time::Duration) -> Self {
self.keepalive_interval = interval;
self
}
#[must_use]
pub const fn keepalive_interval(&self) -> std::time::Duration {
self.keepalive_interval
}
pub fn from_lines<I, S>(lines: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Self {
source: JetstreamSource::Lines(lines.into_iter().map(Into::into).collect()),
buffered: VecDeque::new(),
keepalive_interval: std::time::Duration::from_secs(30),
}
}
}
impl EventStream for JetstreamEventStream {
async fn next_event(&mut self) -> Result<Option<RawEvent>, IndexerError> {
use futures_util::SinkExt;
loop {
if let Some(ev) = self.buffered.pop_front() {
return Ok(Some(ev));
}
let line = match &mut self.source {
JetstreamSource::Lines(queue) => match queue.pop_front() {
Some(s) => s,
None => return Ok(None),
},
JetstreamSource::Socket { writer, reader } => {
let sleep = tokio::time::sleep(self.keepalive_interval);
tokio::pin!(sleep);
tokio::select! {
maybe_msg = reader.next() => match maybe_msg {
Some(Ok(Message::Text(t))) => String::from(t.as_str()),
Some(Ok(Message::Binary(b))) => String::from_utf8(b)
.map_err(|e| IndexerError::Stream(format!("jetstream binary: {e}")))?,
Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => continue,
Some(Ok(Message::Close(_))) | None => return Ok(None),
Some(Err(e)) => {
return Err(IndexerError::Stream(format!("jetstream recv: {e}")));
}
},
() = &mut sleep => {
writer
.send(Message::Ping(Vec::new()))
.await
.map_err(|e| IndexerError::Stream(format!(
"jetstream keepalive ping send failed: {e}"
)))?;
continue;
}
}
}
};
if let Some(event) = parse_frame(&line)? {
return Ok(Some(event));
}
}
}
}