use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use super::EventSink;
use crate::Result;
use crate::events::MarketEvent;
pub struct KafkaSink {
topic: String,
name: String,
}
impl KafkaSink {
pub fn new(
_bootstrap_servers: &str,
topic: impl Into<String>,
) -> std::result::Result<Self, String> {
Ok(Self {
topic: topic.into(),
name: "kafka".to_string(),
})
}
}
#[async_trait]
impl EventSink for KafkaSink {
async fn accept(&self, events: &[MarketEvent]) -> Result<()> {
let payload: Vec<String> = events
.iter()
.map(serde_json::to_string)
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| crate::Error::JsonParse(ustr::ustr(&e.to_string())))?;
let _ndjson = payload.join("\n");
let _ = self.topic; Ok(())
}
async fn shutdown(&self, _token: CancellationToken) -> Result<()> {
Ok(())
}
fn name(&self) -> &str {
&self.name
}
}