use std::{io::Write, net::TcpStream};
use crate::{
config::TcpPluginConfig,
error::{EventError, Result},
schema::AggregateSchema,
store::{AggregateState, EventRecord},
};
use super::Plugin;
pub(super) struct TcpPlugin {
config: TcpPluginConfig,
}
impl TcpPlugin {
pub(super) fn new(config: TcpPluginConfig) -> Self {
Self { config }
}
pub(super) fn ensure_ready(&self) -> Result<()> {
self.connect().map(|_| ())
}
fn connect(&self) -> Result<TcpStream> {
let addr = format!("{}:{}", self.config.host, self.config.port);
TcpStream::connect(&addr).map_err(|err| EventError::Storage(err.to_string()))
}
}
impl Plugin for TcpPlugin {
fn name(&self) -> &'static str {
"tcp"
}
fn notify_event(
&self,
record: &EventRecord,
_state: &AggregateState,
_schema: Option<&AggregateSchema>,
) -> Result<()> {
let mut stream = self.connect()?;
let payload = serde_json::to_string(record)
.map_err(|err| EventError::Serialization(err.to_string()))?;
stream
.write_all(payload.as_bytes())
.map_err(|err| EventError::Storage(err.to_string()))?;
stream
.write_all(b"\n")
.map_err(|err| EventError::Storage(err.to_string()))?;
Ok(())
}
}