use crate::core::ctx::Ctx;
use crate::core::engine::Engine;
use crate::middleware::SpawnerLayer;
use crate::store::output::OutputStore;
use crate::types::{CapToken, TaskId};
use crate::worker::adapter::{SpawnError, SpawnerAdapter};
use crate::worker::Worker;
use async_trait::async_trait;
use serde_json::Value;
use std::sync::Arc;
pub const DATA_SINK_ENDPOINT_KEY: &str = "data_sink_endpoint";
pub struct SinkMiddleware {
store: Arc<dyn OutputStore>,
endpoint_hint: String,
}
impl SinkMiddleware {
pub fn new(store: Arc<dyn OutputStore>, endpoint_hint: impl Into<String>) -> Self {
Self {
store,
endpoint_hint: endpoint_hint.into(),
}
}
pub fn store(&self) -> &Arc<dyn OutputStore> {
&self.store
}
}
impl SpawnerLayer for SinkMiddleware {
fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
Arc::new(SinkWrapped {
inner,
store: self.store.clone(),
endpoint_hint: self.endpoint_hint.clone(),
})
}
}
struct SinkWrapped {
inner: Arc<dyn SpawnerAdapter>,
#[allow(dead_code)] store: Arc<dyn OutputStore>,
endpoint_hint: String,
}
#[async_trait]
impl SpawnerAdapter for SinkWrapped {
async fn spawn(
&self,
engine: &Engine,
ctx: &Ctx,
task_id: TaskId,
attempt: u32,
token: CapToken,
) -> Result<Box<dyn Worker>, SpawnError> {
let mut new_ctx = ctx.clone();
new_ctx.meta.runtime.insert(
DATA_SINK_ENDPOINT_KEY.to_string(),
Value::String(self.endpoint_hint.clone()),
);
self.inner
.spawn(engine, &new_ctx, task_id, attempt, token)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::output::InMemoryOutputStore;
#[test]
fn new_layer_holds_store_and_hint() {
let store: Arc<dyn OutputStore> = Arc::new(InMemoryOutputStore::new());
let layer = SinkMiddleware::new(store.clone(), "http://127.0.0.1:7785/v1/data/emit");
assert_eq!(layer.endpoint_hint, "http://127.0.0.1:7785/v1/data/emit");
assert!(Arc::ptr_eq(layer.store(), &store));
}
}