use std::sync::Arc;
use ai_store_core::{Event, Label, ProjectionSink, Seq, StoreError, StreamId};
use async_trait::async_trait;
use serde_json::Value;
pub trait SyncProjectionSink: Send + Sync + 'static {
fn id(&self) -> &str;
fn commit(
&self,
stream: &StreamId,
seq: Seq,
state: &Value,
event: &Event,
) -> Result<(), StoreError>;
fn on_label_set(
&self,
_stream: &StreamId,
_label: &Label,
_at: Seq,
_state: &Value,
_event: &Event,
) -> Result<(), StoreError> {
Ok(())
}
fn on_label_deleted(&self, _stream: &StreamId, _label: &Label) -> Result<(), StoreError> {
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Dispatch {
Inline,
SpawnBlocking,
}
pub struct BlockingSink<T: SyncProjectionSink> {
inner: Arc<T>,
dispatch: Dispatch,
}
impl<T: SyncProjectionSink> BlockingSink<T> {
pub fn new(inner: T) -> Self {
Self {
inner: Arc::new(inner),
dispatch: Dispatch::SpawnBlocking,
}
}
pub fn inline(inner: T) -> Self {
Self {
inner: Arc::new(inner),
dispatch: Dispatch::Inline,
}
}
pub fn inner(&self) -> &Arc<T> {
&self.inner
}
}
#[async_trait]
impl<T: SyncProjectionSink> ProjectionSink for BlockingSink<T> {
fn id(&self) -> &str {
self.inner.id()
}
async fn commit(
&self,
stream: &StreamId,
seq: Seq,
state: &Value,
event: &Event,
) -> Result<(), StoreError> {
match self.dispatch {
Dispatch::Inline => self.inner.commit(stream, seq, state, event),
Dispatch::SpawnBlocking => {
let inner = Arc::clone(&self.inner);
let stream = stream.clone();
let state = state.clone();
let event = event.clone();
tokio::task::spawn_blocking(move || inner.commit(&stream, seq, &state, &event))
.await
.map_err(|e| StoreError::Backend(format!("blocking sink join: {e}")))?
}
}
}
async fn on_label_set(
&self,
stream: &StreamId,
label: &Label,
at: Seq,
state: &Value,
event: &Event,
) -> Result<(), StoreError> {
match self.dispatch {
Dispatch::Inline => self.inner.on_label_set(stream, label, at, state, event),
Dispatch::SpawnBlocking => {
let inner = Arc::clone(&self.inner);
let stream = stream.clone();
let label = label.clone();
let state = state.clone();
let event = event.clone();
tokio::task::spawn_blocking(move || {
inner.on_label_set(&stream, &label, at, &state, &event)
})
.await
.map_err(|e| StoreError::Backend(format!("blocking sink join: {e}")))?
}
}
}
async fn on_label_deleted(&self, stream: &StreamId, label: &Label) -> Result<(), StoreError> {
match self.dispatch {
Dispatch::Inline => self.inner.on_label_deleted(stream, label),
Dispatch::SpawnBlocking => {
let inner = Arc::clone(&self.inner);
let stream = stream.clone();
let label = label.clone();
tokio::task::spawn_blocking(move || inner.on_label_deleted(&stream, &label))
.await
.map_err(|e| StoreError::Backend(format!("blocking sink join: {e}")))?
}
}
}
}