use std::{net::SocketAddr, sync::Arc};
use rsclaw::{
MemoryTier,
agent::{AgentRegistry, AgentReply},
config::{
runtime::{
AgentsRuntime, ChannelRuntime, ExtRuntime, GatewayRuntime, ModelRuntime, OpsRuntime,
RuntimeConfig,
},
schema::{AgentEntry, BindMode, GatewayMode, ReloadMode, SessionConfig},
},
gateway::LiveConfig,
server::{AppState, serve},
store::Store,
};
use tokio::sync::broadcast;
fn free_addr() -> SocketAddr {
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind :0");
listener.local_addr().expect("local_addr")
}
fn config_with_echo_agent(port: u16) -> RuntimeConfig {
RuntimeConfig {
gateway: GatewayRuntime {
port,
mode: GatewayMode::Local,
bind: BindMode::Loopback,
bind_address: None,
reload: ReloadMode::Hybrid,
auth_token: None,
auth_token_configured: false,
auth_token_is_plaintext: false,
allow_tailscale: false,
channel_health_check_minutes: 5,
channel_stale_event_threshold_minutes: 30,
channel_max_restarts_per_hour: 10,
user_agent: None,
language: None,
},
agents: AgentsRuntime {
defaults: Default::default(),
list: vec![AgentEntry {
id: "echo".to_string(),
default: Some(true),
workspace: None,
model: None,
lane: None,
lane_concurrency: None,
group_chat: None,
channels: None,
name: None,
commands: None,
allowed_commands: None,
opencode: None,
claudecode: None,
codex: None,
flash_model: None,
agent_dir: None,
system: None,
temperature: None,
}],
bindings: vec![],
external: vec![],
},
channel: ChannelRuntime {
channels: Default::default(),
session: SessionConfig {
dm_scope: None,
thread_bindings: None,
reset: None,
identity_links: None,
maintenance: None,
},
},
model: ModelRuntime {
models: None,
auth: None,
},
ext: ExtRuntime {
tools: None,
skills: None,
plugins: None,
evolution: None,
},
ops: OpsRuntime {
cron: None,
hooks: None,
sandbox: None,
logging: None,
secrets: None,
},
raw: Default::default(),
}
}
async fn start_echo_server(addr: SocketAddr) {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let config = Arc::new(config_with_echo_agent(addr.port()));
let live = Arc::new(LiveConfig::new((*config).clone()));
let data_dir = tempfile::tempdir().expect("tempdir");
let store = Arc::new(Store::open(data_dir.path(), MemoryTier::Low).expect("store"));
let (event_tx, _) = broadcast::channel(16);
let providers = std::sync::Arc::new(rsclaw::provider::registry::ProviderRegistry::new());
let (registry, receivers) = AgentRegistry::from_config_with_receivers(&config, providers);
for (_id, mut rx) in receivers {
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
let _ = msg.reply_tx.send(AgentReply {
text: format!("echo: {}", msg.text),
is_empty: false,
tool_calls: None,
images: vec![],
files: vec![],
pending_analysis: None,
needs_outer_done_emit: false,
});
}
});
}
let state = AppState {
config,
live,
agents: Arc::new(registry),
store,
event_bus: event_tx,
devices: Arc::new(rsclaw::ws::DeviceStore::new(std::path::PathBuf::from(
"/tmp/test-devices.json",
))),
ws_conns: Arc::new(rsclaw::ws::ConnRegistry::new()),
feishu: Arc::new(tokio::sync::OnceCell::new()),
wecom: Arc::new(tokio::sync::OnceCell::new()),
whatsapp: Arc::new(tokio::sync::OnceCell::new()),
line: Arc::new(tokio::sync::OnceCell::new()),
zalo: Arc::new(tokio::sync::OnceCell::new()),
started_at: std::time::Instant::now(),
dm_enforcers: std::sync::Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
custom_webhooks: std::sync::Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
cron_reload: broadcast::channel(1).0,
notification_tx: broadcast::channel(16).0,
wasm_plugins: Arc::new(Vec::new()),
plugins: Arc::new(rsclaw::plugin::PluginRegistry::default()),
restart_request_tx: broadcast::channel(16).0,
pending_restart: Arc::new(std::sync::RwLock::new(None)),
shutdown: rsclaw::gateway::ShutdownCoordinator::new(),
};
std::mem::forget(data_dir);
tokio::spawn(async move {
serve(state, addr).await.expect("serve");
});
for _ in 0..50 {
if reqwest::get(format!("http://{addr}/api/v1/health"))
.await
.is_ok()
{
return;
}
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
}
panic!("server did not start within 1 s");
}
#[tokio::test]
async fn agent_turn_echo_reply() {
let addr = free_addr();
start_echo_server(addr).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("http://{addr}/api/v1/message"))
.json(&serde_json::json!({"text": "hello from test"}))
.send()
.await
.expect("POST /api/v1/message");
assert_eq!(resp.status(), 200, "send_message should return 200");
let body: serde_json::Value = resp.json().await.expect("JSON body");
let reply = body["reply"].as_str().expect("reply field missing");
let session_key = body["session_key"]
.as_str()
.expect("session_key field missing");
assert!(!reply.is_empty(), "reply should not be empty");
assert!(
reply.contains("hello from test"),
"echo should contain the original text, got: {reply}"
);
assert!(!session_key.is_empty(), "session_key should be present");
}
#[tokio::test]
async fn agent_turn_explicit_session_key_preserved() {
let addr = free_addr();
start_echo_server(addr).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("http://{addr}/api/v1/message"))
.json(&serde_json::json!({
"text": "ping",
"session_key": "test-session-42"
}))
.send()
.await
.expect("POST /api/v1/message");
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.expect("JSON body");
assert_eq!(
body["session_key"].as_str(),
Some("test-session-42"),
"provided session_key should be echoed back in the response"
);
}
#[tokio::test]
async fn agent_turn_unknown_agent_id_falls_back_to_default() {
let addr = free_addr();
start_echo_server(addr).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("http://{addr}/api/v1/message"))
.json(&serde_json::json!({
"text": "hello",
"agent_id": "nonexistent"
}))
.send()
.await
.expect("POST /api/v1/message");
assert_eq!(resp.status(), 200, "should fall back to default agent");
}