use crate::inbound::MediaFrameRegistry;
use crate::lifecycle::CredentialState;
use crate::wire::webrtc::{
SignalingClient, WebRtcConfig, WebRtcCoordinator, WebSocketSignalingClient,
};
use actr_protocol::{AIdCredential, ActrError, ActrId, ActrType, Realm};
use std::sync::Arc;
pub fn make_actor_id(serial_number: u64) -> ActrId {
ActrId {
realm: Realm { realm_id: 1 },
serial_number,
r#type: ActrType {
manufacturer: "acme".to_string(),
name: "node".to_string(),
version: "1.0.0".to_string(),
},
}
}
pub fn dummy_credential() -> AIdCredential {
AIdCredential {
key_id: 7,
claims: bytes::Bytes::from_static(b"dummy-claims"),
signature: bytes::Bytes::from(vec![0u8; 64]),
}
}
pub fn create_credential_state_for_test(credential: AIdCredential) -> CredentialState {
CredentialState::new(credential, None, None)
}
pub async fn create_peer_with_websocket(
id: ActrId,
server_url: &str,
) -> anyhow::Result<(Arc<WebRtcCoordinator>, Arc<dyn SignalingClient>)> {
let credential_state = create_credential_state_for_test(dummy_credential());
let signaling_client = WebSocketSignalingClient::connect_to_with_identity(
server_url,
id.clone(),
credential_state.clone(),
)
.await
.expect("Failed to connect to test server");
let config = WebRtcConfig::default();
let media_registry = Arc::new(MediaFrameRegistry::new());
let signaling_client_arc = signaling_client as Arc<dyn SignalingClient>;
let coordinator = Arc::new(WebRtcCoordinator::new(
id,
credential_state,
signaling_client_arc.clone(),
config,
media_registry,
));
let c = coordinator.clone();
tokio::spawn(async move {
let _ = c.start().await;
});
Ok((coordinator, signaling_client_arc))
}
pub async fn create_peer_with_vnet(
id: ActrId,
server_url: &str,
vnet: Arc<webrtc_util::vnet::net::Net>,
) -> anyhow::Result<(Arc<WebRtcCoordinator>, Arc<dyn SignalingClient>)> {
let credential_state = create_credential_state_for_test(dummy_credential());
let signaling_client = WebSocketSignalingClient::connect_to_with_identity(
server_url,
id.clone(),
credential_state.clone(),
)
.await
.expect("Failed to connect to test server");
let config = WebRtcConfig::default();
let media_registry = Arc::new(MediaFrameRegistry::new());
let signaling_client_arc = signaling_client as Arc<dyn SignalingClient>;
let mut coordinator = WebRtcCoordinator::new(
id,
credential_state,
signaling_client_arc.clone(),
config,
media_registry,
);
coordinator.set_vnet(vnet);
let coordinator = Arc::new(coordinator);
let c = coordinator.clone();
tokio::spawn(async move {
let _ = c.start().await;
});
Ok((coordinator, signaling_client_arc))
}
pub fn spawn_response_receiver(
coordinator: Arc<WebRtcCoordinator>,
gate: Arc<crate::outbound::PeerGate>,
peer_name: &str,
) -> tokio::task::JoinHandle<()> {
let peer_name = peer_name.to_string();
tokio::spawn(async move {
use actr_protocol::prost::Message;
tracing::info!("🎯 {} response receiver task started", peer_name);
loop {
match coordinator.receive_message().await {
Ok(Some((_sender_id_bytes, message_data, _payload_type))) => {
match actr_protocol::RpcEnvelope::decode(message_data.as_ref()) {
Ok(envelope) => {
tracing::debug!(
"📨 {} received response: {}",
peer_name,
envelope.request_id
);
let result = if let Some(error) = envelope.error {
Err(ActrError::Unavailable(format!(
"RPC error {}: {}",
error.code, error.message
)))
} else if let Some(payload) = envelope.payload {
Ok(payload)
} else {
Err(ActrError::DecodeFailure(
"Invalid response: no payload or error".to_string(),
))
};
match gate.handle_response(&envelope.request_id, result).await {
Ok(true) => {
tracing::debug!(
"✅ {} handled response for {}",
peer_name,
envelope.request_id
);
}
Ok(false) => {
tracing::warn!(
"⚠️ {} no pending request found for {}",
peer_name,
envelope.request_id
);
}
Err(e) => {
tracing::error!(
"{}: Failed to handle response: {}",
peer_name,
e
);
}
}
}
Err(e) => {
tracing::error!("{}: Failed to decode RpcEnvelope: {}", peer_name, e);
}
}
}
Ok(None) => {
tracing::info!("📪 {} message channel closed", peer_name);
break;
}
Err(e) => {
tracing::error!("{}: Error receiving message: {}", peer_name, e);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
})
}
pub fn spawn_echo_responder(
coordinator: Arc<WebRtcCoordinator>,
gate: Arc<crate::outbound::PeerGate>,
peer_name: &str,
) -> tokio::task::JoinHandle<()> {
let peer_name = peer_name.to_string();
tokio::spawn(async move {
use actr_protocol::prost::Message;
tracing::info!("🎯 {} echo responder task started", peer_name);
loop {
match coordinator.receive_message().await {
Ok(Some((sender_id_bytes, message_data, _payload_type))) => {
let sender_id = match ActrId::decode(&sender_id_bytes[..]) {
Ok(id) => id,
Err(e) => {
tracing::error!("{}: Failed to decode sender ID: {}", peer_name, e);
continue;
}
};
match actr_protocol::RpcEnvelope::decode(message_data.as_ref()) {
Ok(request) => {
tracing::debug!(
"📨 {} received request: {}",
peer_name,
request.request_id
);
let response = actr_protocol::RpcEnvelope {
request_id: request.request_id.clone(),
route_key: "response".to_string(),
payload: Some(bytes::Bytes::from("pong")),
timeout_ms: 0,
..Default::default()
};
if let Err(e) = gate.send_message(&sender_id, response).await {
tracing::error!(
"{}: Failed to send response for {}: {}",
peer_name,
request.request_id,
e
);
} else {
tracing::debug!(
"✅ {} sent response for {}",
peer_name,
request.request_id
);
}
}
Err(e) => {
tracing::error!("{}: Failed to decode RpcEnvelope: {}", peer_name, e);
}
}
}
Ok(None) => {
tracing::info!("📪 {} message channel closed", peer_name);
break;
}
Err(e) => {
tracing::error!("{}: Error receiving message: {}", peer_name, e);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
})
}