use super::*;
use futures::StreamExt;
use harn_vm::event_log::{EventLog, Topic};
fn write_test_file(dir: &Path, relative: &str, contents: &str) {
let path = dir.join(relative);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).unwrap();
}
std::fs::write(path, contents).unwrap();
}
fn stream_manifest_fixture() -> &'static str {
r#"
[package]
name = "fixture"
[exports]
handlers = "lib.harn"
[[triggers]]
id = "ws-stream"
kind = "stream"
provider = "websocket"
path = "/streams/ws"
match = { events = ["quote.tick"] }
handler = "handlers::on_stream"
"#
}
fn stream_handler_fixture(marker_path: &Path) -> String {
format!(
r#"
import "std/triggers"
pub fn on_stream(event: TriggerEvent) {{
write_file({marker:?}, json_stringify({{
provider: event.provider,
kind: event.kind,
key: event.provider_payload.key,
stream: event.provider_payload.stream,
amount: event.provider_payload.raw.value.amount,
}}))
}}
"#,
marker = marker_path.display().to_string()
)
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_trigger_route_uses_generic_stream_connector_in_process() {
let _env_lock = crate::tests::common::env_lock::lock_env().lock().await;
let _secret_providers = crate::env_guard::ScopedEnvVar::set("HARN_SECRET_PROVIDERS", "env");
let temp = tempfile::TempDir::new().unwrap();
let marker_path = temp.path().join("stream-handler.json");
write_test_file(temp.path(), "harn.toml", stream_manifest_fixture());
write_test_file(
temp.path(),
"lib.harn",
&stream_handler_fixture(&marker_path),
);
let config =
OrchestratorConfig::for_test(temp.path().join("harn.toml"), temp.path().join("state"));
let harness = OrchestratorHarness::start(config)
.await
.expect("harness start");
let base_url = harness.listener_url().to_string();
let event_log = harness.event_log();
let response = reqwest::Client::new()
.post(format!("{base_url}/streams/ws"))
.header("content-type", "application/json")
.json(&serde_json::json!({
"key": "acct-1",
"stream": "quotes",
"value": {"amount": 10}
}))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let topic = Topic::new("orchestrator.lifecycle").unwrap();
let mut stream = event_log.clone().subscribe(&topic, None).await.unwrap();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
loop {
let remaining = deadline
.checked_duration_since(tokio::time::Instant::now())
.expect("timed out waiting for pump_dispatch_completed");
let (_, event) = tokio::time::timeout(remaining, stream.next())
.await
.expect("timed out waiting for pump_dispatch_completed event")
.expect("event stream ended unexpectedly")
.expect("event stream error");
if event.kind == "pump_dispatch_completed"
&& event.payload["status"] == serde_json::json!("completed")
{
break;
}
}
drop(stream);
let marker: serde_json::Value =
serde_json::from_str(&std::fs::read_to_string(&marker_path).unwrap()).unwrap();
assert_eq!(
marker.get("provider").and_then(|v| v.as_str()),
Some("websocket")
);
assert_eq!(
marker.get("kind").and_then(|v| v.as_str()),
Some("quote.tick")
);
assert_eq!(marker.get("key").and_then(|v| v.as_str()), Some("acct-1"));
assert_eq!(
marker.get("stream").and_then(|v| v.as_str()),
Some("quotes")
);
assert_eq!(marker.get("amount").and_then(|v| v.as_i64()), Some(10));
harness
.shutdown(std::time::Duration::from_secs(5))
.await
.expect("harness shutdown");
}