use std::pin::Pin;
use std::sync::Mutex;
use async_trait::async_trait;
use futures_util::stream;
use futures_util::Stream;
use crate::error::Result;
use crate::event::Delta;
#[derive(Debug, Clone, Default)]
pub struct PullResult {
pub deltas: Vec<Delta>,
pub next_cursor: Option<String>,
pub has_more: bool,
}
pub type DeltaStream = Pin<Box<dyn Stream<Item = Result<Delta>> + Send>>;
#[async_trait]
pub trait SyncSource: Send + Sync {
fn id(&self) -> &str;
async fn pull(&self, cursor: Option<String>) -> Result<PullResult>;
async fn subscribe(&self, cursor: Option<String>) -> Result<DeltaStream> {
let batch = self.pull(cursor).await?;
let deltas: Vec<Result<Delta>> = batch.deltas.into_iter().map(Ok).collect();
Ok(Box::pin(stream::iter(deltas)))
}
}
pub struct StaticSource {
id: String,
batches: Mutex<Vec<PullResult>>,
}
impl StaticSource {
pub fn new(id: impl Into<String>, batches: Vec<PullResult>) -> Self {
Self {
id: id.into(),
batches: Mutex::new(batches),
}
}
pub fn from_deltas(id: impl Into<String>, deltas: Vec<Delta>) -> Self {
Self::new(
id,
vec![PullResult {
deltas,
next_cursor: None,
has_more: false,
}],
)
}
pub fn enqueue(&self, batch: PullResult) {
self.batches.lock().unwrap().push(batch);
}
}
#[async_trait]
impl SyncSource for StaticSource {
fn id(&self) -> &str {
&self.id
}
async fn pull(&self, _cursor: Option<String>) -> Result<PullResult> {
let mut batches = self.batches.lock().unwrap();
if batches.is_empty() {
Ok(PullResult::default())
} else {
Ok(batches.remove(0))
}
}
}