use std::time::{Duration, Instant};
use apcore::context::Context;
use apcore::module::Module;
use apcore::{APCore, ChunkStream, ModuleError};
use async_stream::stream;
use async_trait::async_trait;
use futures_util::StreamExt;
use serde_json::{json, Value};
use tokio::time::{sleep, timeout};
struct SlowStreamingModule;
#[async_trait]
impl Module for SlowStreamingModule {
fn input_schema(&self) -> Value {
json!({"type": "object"})
}
fn output_schema(&self) -> Value {
json!({"type": "object"})
}
fn description(&self) -> &'static str {
"slow streaming test module"
}
async fn execute(&self, _inputs: Value, _ctx: &Context<Value>) -> Result<Value, ModuleError> {
Ok(json!({}))
}
fn stream(&self, _inputs: Value, _ctx: &Context<Value>) -> Option<ChunkStream> {
Some(Box::pin(stream! {
for i in 0..5u32 {
sleep(Duration::from_millis(100)).await;
yield Ok(json!({ "chunk": i }));
}
}))
}
}
struct NonStreamingModule;
#[async_trait]
impl Module for NonStreamingModule {
fn input_schema(&self) -> Value {
json!({"type": "object"})
}
fn output_schema(&self) -> Value {
json!({"type": "object"})
}
fn description(&self) -> &'static str {
"non-streaming module"
}
async fn execute(&self, _inputs: Value, _ctx: &Context<Value>) -> Result<Value, ModuleError> {
Ok(json!({"ok": true}))
}
}
struct BadSchemaStreamingModule;
#[async_trait]
impl Module for BadSchemaStreamingModule {
fn input_schema(&self) -> Value {
json!({"type": "object"})
}
fn output_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"result": { "type": "integer" }
},
"required": ["result"]
})
}
fn description(&self) -> &'static str {
"streaming module with bad schema"
}
async fn execute(&self, _inputs: Value, _ctx: &Context<Value>) -> Result<Value, ModuleError> {
Ok(json!({}))
}
fn stream(&self, _inputs: Value, _ctx: &Context<Value>) -> Option<ChunkStream> {
Some(Box::pin(stream! {
yield Ok(json!({"partial": "a"}));
yield Ok(json!({"partial": "b"}));
}))
}
}
#[tokio::test]
async fn first_chunk_arrives_before_stream_completes() {
let apcore = APCore::new();
apcore
.register("slow.stream", Box::new(SlowStreamingModule))
.unwrap();
let start = Instant::now();
let mut s = apcore
.executor()
.stream("slow.stream", json!({}), None, None);
let first = timeout(Duration::from_millis(300), s.next())
.await
.expect("first chunk should arrive before Vec-collected stream could finish");
let first_elapsed = start.elapsed();
let chunk = first
.expect("stream yielded None before first chunk")
.expect("first chunk should be Ok");
assert_eq!(chunk["chunk"], 0);
assert!(
first_elapsed < Duration::from_millis(300),
"first chunk arrived too late: {first_elapsed:?}"
);
}
#[tokio::test]
async fn all_chunks_arrive_in_order() {
let apcore = APCore::new();
apcore
.register("slow.stream", Box::new(SlowStreamingModule))
.unwrap();
let mut s = apcore
.executor()
.stream("slow.stream", json!({}), None, None);
let mut chunks = Vec::new();
while let Some(item) = s.next().await {
chunks.push(item.expect("no chunks should be errors"));
}
assert_eq!(chunks.len(), 5);
for (i, c) in chunks.iter().enumerate() {
assert_eq!(c["chunk"], i as u64);
}
}
#[tokio::test]
async fn streaming_falls_back_to_execute_when_module_does_not_support_streaming() {
let apcore = APCore::new();
apcore
.register("plain.mod", Box::new(NonStreamingModule))
.unwrap();
let mut s = apcore.executor().stream("plain.mod", json!({}), None, None);
let first = s
.next()
.await
.expect("stream should yield exactly one chunk equal to execute()'s output");
let chunk = first.expect("fallback path must succeed");
assert_eq!(chunk, json!({"ok": true}));
assert!(s.next().await.is_none());
}
#[tokio::test]
async fn streaming_global_deadline_aborts_between_chunks() {
use apcore::context::Identity;
let apcore = APCore::new();
apcore
.register("slow.stream", Box::new(SlowStreamingModule))
.unwrap();
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64();
let mut ctx = Context::<Value>::new(Identity::new(
"@external".to_string(),
"external".to_string(),
vec![],
std::collections::HashMap::new(),
));
ctx.caller_id = Some("@external".to_string());
ctx.global_deadline = Some(now_secs + 0.15);
let mut s = apcore
.executor()
.stream("slow.stream", json!({}), Some(&ctx), None);
let mut ok_count = 0usize;
let mut got_timeout = false;
while let Some(item) = s.next().await {
match item {
Ok(_) => ok_count += 1,
Err(e) => {
assert_eq!(
e.code,
apcore::errors::ErrorCode::ModuleTimeout,
"deadline-exceeded error must be ModuleTimeout"
);
got_timeout = true;
break;
}
}
}
assert!(
got_timeout,
"stream must surface a ModuleTimeout once the global deadline elapses"
);
assert!(
ok_count < 5,
"stream must abort before all 5 chunks are delivered (saw {ok_count})"
);
}
#[tokio::test]
async fn phase3_validation_failure_is_swallowed_chunks_still_delivered() {
let apcore = APCore::new();
apcore
.register("bad.stream", Box::new(BadSchemaStreamingModule))
.unwrap();
let mut s = apcore
.executor()
.stream("bad.stream", json!({}), None, None);
let mut ok_count = 0usize;
let mut err_count = 0usize;
while let Some(item) = s.next().await {
match item {
Ok(_) => ok_count += 1,
Err(_) => err_count += 1,
}
}
assert_eq!(ok_count, 2, "both data chunks should reach the caller");
assert_eq!(
err_count, 0,
"Phase-3 validation failure must be swallowed (logged) — chunks already delivered"
);
}