use async_trait::async_trait;
use tokio::sync::mpsc;
use crate::error::OversyncError;
use crate::model::{EventEnvelope, RawRow};
#[async_trait]
pub trait TransformHook: Send + Sync {
async fn transform(
&self,
envelopes: Vec<EventEnvelope>,
) -> Result<Vec<EventEnvelope>, OversyncError>;
}
#[async_trait]
pub trait OriginConnector: Send + Sync {
fn name(&self) -> &str;
async fn fetch_all(&self, sql: &str, key_column: &str) -> Result<Vec<RawRow>, OversyncError>;
async fn fetch_into(
&self,
sql: &str,
key_column: &str,
batch_size: usize,
tx: mpsc::Sender<Vec<RawRow>>,
) -> Result<usize, OversyncError> {
let all = self.fetch_all(sql, key_column).await?;
let total = all.len();
for chunk in all.chunks(batch_size) {
tx.send(chunk.to_vec())
.await
.map_err(|_| OversyncError::Internal("channel closed".into()))?;
}
Ok(total)
}
async fn test_connection(&self) -> Result<(), OversyncError>;
}
#[async_trait]
pub trait Sink: Send + Sync {
fn name(&self) -> &str;
async fn send_event(&self, envelope: &EventEnvelope) -> Result<(), OversyncError>;
async fn send_batch(&self, envelopes: &[EventEnvelope]) -> Result<(), OversyncError> {
for envelope in envelopes {
self.send_event(envelope).await?;
}
Ok(())
}
async fn test_connection(&self) -> Result<(), OversyncError>;
}
#[async_trait]
pub trait OriginFactory: Send + Sync {
fn connector_type(&self) -> &str;
async fn create(
&self,
name: &str,
config: &serde_json::Value,
) -> Result<Box<dyn OriginConnector>, OversyncError>;
}
#[async_trait]
pub trait TargetFactory: Send + Sync {
fn sink_type(&self) -> &str;
async fn create(
&self,
name: &str,
config: &serde_json::Value,
) -> Result<Box<dyn Sink>, OversyncError>;
}
pub struct TransformPipeline {
hooks: Vec<std::sync::Arc<dyn TransformHook>>,
}
impl TransformPipeline {
pub fn new(hooks: Vec<std::sync::Arc<dyn TransformHook>>) -> Self {
Self { hooks }
}
}
#[async_trait]
impl TransformHook for TransformPipeline {
async fn transform(
&self,
mut envelopes: Vec<EventEnvelope>,
) -> Result<Vec<EventEnvelope>, OversyncError> {
for hook in &self.hooks {
envelopes = hook.transform(envelopes).await?;
}
Ok(envelopes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{EventEnvelope, EventMeta, OpType};
struct DoubleTransform;
#[async_trait]
impl TransformHook for DoubleTransform {
async fn transform(
&self,
envelopes: Vec<EventEnvelope>,
) -> Result<Vec<EventEnvelope>, OversyncError> {
let mut out = envelopes.clone();
out.extend(envelopes);
Ok(out)
}
}
fn test_envelope() -> EventEnvelope {
EventEnvelope {
meta: EventMeta {
op: OpType::Created,
origin_id: "s".into(),
query_id: "q".into(),
key: "k".into(),
hash: "h".into(),
cycle_id: 1,
timestamp: chrono::Utc::now(),
},
data: serde_json::json!({}),
}
}
#[tokio::test]
async fn transform_hook_receives_and_returns_envelopes() {
let hook = DoubleTransform;
let input = vec![test_envelope()];
let output = hook.transform(input).await.unwrap();
assert_eq!(output.len(), 2);
}
#[tokio::test]
async fn transform_hook_can_return_empty() {
struct DropAll;
#[async_trait]
impl TransformHook for DropAll {
async fn transform(
&self,
_envelopes: Vec<EventEnvelope>,
) -> Result<Vec<EventEnvelope>, OversyncError> {
Ok(vec![])
}
}
let output = DropAll.transform(vec![test_envelope()]).await.unwrap();
assert!(output.is_empty());
}
#[tokio::test]
async fn transform_hook_can_return_error() {
struct FailTransform;
#[async_trait]
impl TransformHook for FailTransform {
async fn transform(
&self,
_envelopes: Vec<EventEnvelope>,
) -> Result<Vec<EventEnvelope>, OversyncError> {
Err(OversyncError::Internal("transform failed".into()))
}
}
let result = FailTransform.transform(vec![test_envelope()]).await;
let err = result.unwrap_err();
assert!(
matches!(err, OversyncError::Internal(_)),
"expected Internal variant, got: {err}"
);
assert!(err.to_string().contains("transform failed"));
}
#[tokio::test]
async fn pipeline_chains_transforms_in_order() {
struct AppendSuffix(&'static str);
#[async_trait]
impl TransformHook for AppendSuffix {
async fn transform(
&self,
envelopes: Vec<EventEnvelope>,
) -> Result<Vec<EventEnvelope>, OversyncError> {
Ok(envelopes
.into_iter()
.map(|mut e| {
e.meta.origin_id.push_str(self.0);
e
})
.collect())
}
}
let pipeline = TransformPipeline::new(vec![
std::sync::Arc::new(AppendSuffix("_a")),
std::sync::Arc::new(AppendSuffix("_b")),
]);
let output = pipeline.transform(vec![test_envelope()]).await.unwrap();
assert_eq!(output[0].meta.origin_id, "s_a_b");
}
#[tokio::test]
async fn pipeline_empty_hooks_is_passthrough() {
let pipeline = TransformPipeline::new(vec![]);
let input = vec![test_envelope()];
let output = pipeline.transform(input.clone()).await.unwrap();
assert_eq!(output.len(), 1);
assert_eq!(output[0].meta.key, input[0].meta.key);
}
#[tokio::test]
async fn pipeline_short_circuits_on_error() {
struct Fail;
#[async_trait]
impl TransformHook for Fail {
async fn transform(
&self,
_: Vec<EventEnvelope>,
) -> Result<Vec<EventEnvelope>, OversyncError> {
Err(OversyncError::Internal("stage 2 failed".into()))
}
}
let pipeline = TransformPipeline::new(vec![
std::sync::Arc::new(DoubleTransform),
std::sync::Arc::new(Fail),
]);
let result = pipeline.transform(vec![test_envelope()]).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("stage 2"));
}
}