use std::sync::{Arc, Mutex};
use symbi_runtime::communication::policy_gate::CommunicationPolicyGate;
use symbi_runtime::communication::{
CommunicationBus, CommunicationConfig, DefaultCommunicationBus,
};
use symbi_runtime::context::manager::{ContextManagerConfig, StandardContextManager};
use symbi_runtime::integrations::policy_engine::engine::{
OpaPolicyEngine, PolicyDecision, PolicyEngine,
};
use symbi_runtime::lifecycle::{DefaultLifecycleController, LifecycleConfig, LifecycleController};
use symbi_runtime::reasoning::agent_registry::AgentRegistry;
use symbi_runtime::reasoning::inference::InferenceProvider;
use symbi_runtime::types::agent::AgentConfig;
use symbi_runtime::types::security::Capability;
use symbi_runtime::types::AgentId;
pub struct RuntimeBridge {
lifecycle_controller: Arc<Mutex<Option<Arc<DefaultLifecycleController>>>>,
context_manager: Arc<Mutex<Option<Arc<StandardContextManager>>>>,
policy_engine: Arc<Mutex<OpaPolicyEngine>>,
inference_provider: Arc<Mutex<Option<Arc<dyn InferenceProvider>>>>,
agent_registry: Arc<AgentRegistry>,
comm_bus: Arc<Mutex<Option<Arc<dyn CommunicationBus + Send + Sync>>>>,
comm_policy: Arc<Mutex<Arc<CommunicationPolicyGate>>>,
#[cfg(feature = "session")]
active_session: Arc<Mutex<Option<symbi_session::monitor::SessionId>>>,
#[cfg(feature = "session")]
session_registry: Arc<symbi_runtime::session::SessionRegistry>,
}
impl Default for RuntimeBridge {
fn default() -> Self {
Self::new()
}
}
impl RuntimeBridge {
pub fn new() -> Self {
Self::with_policy(Arc::new(CommunicationPolicyGate::new(Vec::new())))
}
pub fn new_permissive_for_dev() -> Self {
Self::with_policy(Arc::new(CommunicationPolicyGate::permissive()))
}
pub fn with_policy(comm_policy_gate: Arc<CommunicationPolicyGate>) -> Self {
let lifecycle_controller = Arc::new(Mutex::new(None));
let context_manager = Arc::new(Mutex::new(None));
let policy_engine = Arc::new(Mutex::new(OpaPolicyEngine::new()));
let inference_provider = Arc::new(Mutex::new(None));
let agent_registry = Arc::new(AgentRegistry::new());
let comm_bus = Arc::new(Mutex::new(None));
let comm_policy = Arc::new(Mutex::new(comm_policy_gate));
Self {
lifecycle_controller,
context_manager,
policy_engine,
inference_provider,
agent_registry,
comm_bus,
comm_policy,
#[cfg(feature = "session")]
active_session: Arc::new(Mutex::new(None)),
#[cfg(feature = "session")]
session_registry: Arc::new(symbi_runtime::session::SessionRegistry::new()),
}
}
pub fn set_inference_provider(&self, provider: Arc<dyn InferenceProvider>) {
*self.inference_provider.lock().unwrap() = Some(provider);
}
pub fn agent_registry(&self) -> Arc<AgentRegistry> {
Arc::clone(&self.agent_registry)
}
pub async fn register_agent(
&self,
name: &str,
system_prompt: &str,
tools: Vec<String>,
) -> AgentId {
self.agent_registry
.spawn_agent(name, system_prompt, tools, None)
.await
}
pub async fn delegate(&self, target: &str, message: &str) -> crate::error::Result<String> {
let ctx = self.reasoning_context();
crate::dsl::agent_composition::governed_ask(&ctx, target, message, None).await
}
pub async fn delegate_threaded(
&self,
target: &str,
conversation: &symbi_runtime::reasoning::conversation::Conversation,
) -> crate::error::Result<String> {
let ctx = self.reasoning_context();
crate::dsl::agent_composition::governed_ask_conversation(&ctx, target, conversation).await
}
pub async fn agent_system_prompt(&self, name: &str) -> Option<String> {
self.agent_registry
.get_agent(name)
.await
.map(|a| a.system_prompt)
}
pub fn comm_bus(&self) -> Option<Arc<dyn CommunicationBus + Send + Sync>> {
self.comm_bus.lock().unwrap().clone()
}
pub fn set_comm_policy(&self, policy: Arc<CommunicationPolicyGate>) {
*self.comm_policy.lock().unwrap() = policy;
}
pub fn reasoning_context(&self) -> crate::dsl::reasoning_builtins::ReasoningBuiltinContext {
let provider = self.inference_provider.lock().unwrap().clone();
let comm_bus = self.comm_bus.lock().unwrap().clone();
let comm_policy = Some(self.comm_policy.lock().unwrap().clone());
crate::dsl::reasoning_builtins::ReasoningBuiltinContext {
provider,
agent_registry: Some(Arc::clone(&self.agent_registry)),
sender_agent_id: None,
comm_bus,
comm_policy,
reasoning_policy_gate: None,
#[cfg(feature = "session")]
active_session: self.active_session.clone(),
#[cfg(feature = "session")]
session_monitor: Some(self.session_registry.monitor()),
}
}
pub async fn initialize(&self) -> Result<(), String> {
let lifecycle_config = LifecycleConfig::default();
let lifecycle_controller = Arc::new(
DefaultLifecycleController::new(lifecycle_config)
.await
.map_err(|e| format!("Failed to create lifecycle controller: {}", e))?,
);
let context_config = ContextManagerConfig::default();
let context_manager = Arc::new(
StandardContextManager::new(context_config, "runtime_bridge")
.await
.map_err(|e| format!("Failed to create context manager: {}", e))?,
);
context_manager
.initialize()
.await
.map_err(|e| format!("Failed to initialize context manager: {}", e))?;
let bus_config = CommunicationConfig::default();
let bus = Arc::new(
DefaultCommunicationBus::new(bus_config)
.await
.map_err(|e| format!("Failed to create communication bus: {}", e))?,
) as Arc<dyn CommunicationBus + Send + Sync>;
*self.lifecycle_controller.lock().unwrap() = Some(lifecycle_controller);
*self.context_manager.lock().unwrap() = Some(context_manager);
*self.comm_bus.lock().unwrap() = Some(bus);
Ok(())
}
pub async fn initialize_agent(&self, config: AgentConfig) -> Result<AgentId, String> {
let controller = {
let controller_guard = self.lifecycle_controller.lock().unwrap();
controller_guard.clone()
};
if let Some(controller) = controller {
controller
.initialize_agent(config)
.await
.map_err(|e| e.to_string())
} else {
Err("Lifecycle controller not initialized".to_string())
}
}
pub async fn check_capability(
&self,
agent_id: &str,
capability: &Capability,
) -> Result<PolicyDecision, String> {
let engine = {
let engine_guard = self.policy_engine.lock().unwrap();
engine_guard.clone()
};
engine
.check_capability(agent_id, capability)
.await
.map_err(|e| e.to_string())
}
pub async fn register_event_handler(
&self,
agent_id: &str,
event_name: &str,
_event_type: &str,
) -> Result<(), String> {
tracing::info!(
"Registered event handler '{}' for agent {}",
event_name,
agent_id
);
Ok(())
}
#[cfg(feature = "session")]
pub fn open_session(
&self,
global: &symbi_session::Global,
binding: symbi_runtime::session::RoleBinding,
ttl: std::time::Duration,
) -> Result<symbi_session::monitor::SessionId, symbi_runtime::session::RegistryError> {
let sid = self.session_registry.open(global, binding, ttl)?;
let gate = symbi_runtime::communication::policy_gate::CommunicationPolicyGate::permissive()
.with_session_monitor(self.session_registry.monitor())
.with_transcript(self.session_registry.transcript());
self.set_comm_policy(std::sync::Arc::new(gate));
*self.active_session.lock().unwrap() = Some(sid.clone());
Ok(sid)
}
#[cfg(feature = "session")]
pub fn session_transcript(
&self,
) -> std::sync::Arc<std::sync::Mutex<symbi_runtime::session::SessionTranscript>> {
self.session_registry.transcript()
}
pub async fn emit_event(
&self,
agent_id: &str,
event_name: &str,
_data: &serde_json::Value,
) -> Result<(), String> {
tracing::info!("Agent {} emitted event: {}", agent_id, event_name);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "session")]
#[test]
fn reasoning_context_has_no_session_by_default() {
let bridge = RuntimeBridge::new_permissive_for_dev();
let ctx = bridge.reasoning_context();
assert!(ctx.active_session.lock().unwrap().is_none());
assert!(ctx.session_monitor.is_some()); }
#[tokio::test]
async fn delegate_to_unknown_agent_errors_with_name() {
let bridge = RuntimeBridge::new_permissive_for_dev();
let err = bridge.delegate("nope", "hi").await.unwrap_err();
assert!(
format!("{err}").contains("nope"),
"error should name the missing agent"
);
}
#[tokio::test]
async fn delegate_threaded_unknown_agent_errors() {
let bridge = RuntimeBridge::new_permissive_for_dev();
let conv = symbi_runtime::reasoning::conversation::Conversation::with_system("x");
let err = bridge.delegate_threaded("nope", &conv).await.unwrap_err();
assert!(format!("{err}").contains("nope"));
}
#[tokio::test]
async fn agent_system_prompt_roundtrips() {
let bridge = RuntimeBridge::new_permissive_for_dev();
bridge.register_agent("w", "You are w.", vec![]).await;
assert_eq!(
bridge.agent_system_prompt("w").await.as_deref(),
Some("You are w.")
);
assert!(bridge.agent_system_prompt("missing").await.is_none());
}
#[tokio::test]
async fn register_then_registry_has_agent() {
let bridge = RuntimeBridge::new_permissive_for_dev();
bridge
.register_agent("helper", "You are helper.", vec![])
.await;
let ctx = bridge.reasoning_context();
let reg = ctx.agent_registry.as_ref().unwrap();
assert!(reg.has_agent("helper").await);
}
#[tokio::test]
async fn test_reasoning_context_before_init_has_no_bus() {
let bridge = RuntimeBridge::new();
let ctx = bridge.reasoning_context();
assert!(ctx.comm_bus.is_none());
assert!(ctx.comm_policy.is_some());
}
#[tokio::test]
async fn test_new_default_policy_denies() {
use symbi_runtime::types::MessageType;
let bridge = RuntimeBridge::new();
let ctx = bridge.reasoning_context();
let policy = ctx.comm_policy.expect("policy present");
let recipient = AgentId::new();
let request = symbi_runtime::communication::policy_gate::CommunicationRequest {
sender: AgentId::new(),
recipient,
message_type: MessageType::Direct(recipient),
topic: None,
session_id: None,
protocol_label: None,
};
assert!(
policy.evaluate(&request).is_err(),
"default policy must be deny-by-default"
);
}
#[tokio::test]
async fn test_permissive_for_dev_allows() {
use symbi_runtime::types::MessageType;
let bridge = RuntimeBridge::new_permissive_for_dev();
let ctx = bridge.reasoning_context();
let policy = ctx.comm_policy.expect("policy present");
let recipient = AgentId::new();
let request = symbi_runtime::communication::policy_gate::CommunicationRequest {
sender: AgentId::new(),
recipient,
message_type: MessageType::Direct(recipient),
topic: None,
session_id: None,
protocol_label: None,
};
assert!(policy.evaluate(&request).is_ok());
}
#[tokio::test]
async fn test_reasoning_context_after_init_has_bus() {
let bridge = RuntimeBridge::new_permissive_for_dev();
bridge
.initialize()
.await
.expect("initialize should succeed");
let ctx = bridge.reasoning_context();
assert!(
ctx.comm_bus.is_some(),
"Communication bus should be populated after initialize()"
);
assert!(ctx.comm_policy.is_some(), "Policy gate is always present");
}
#[tokio::test]
async fn test_comm_bus_accessor() {
let bridge = RuntimeBridge::new_permissive_for_dev();
assert!(bridge.comm_bus().is_none());
bridge
.initialize()
.await
.expect("initialize should succeed");
assert!(bridge.comm_bus().is_some());
}
#[tokio::test]
async fn test_set_comm_policy_replaces_default() {
let bridge = RuntimeBridge::new();
let new_policy = Arc::new(CommunicationPolicyGate::permissive());
bridge.set_comm_policy(Arc::clone(&new_policy));
let ctx = bridge.reasoning_context();
let retrieved = ctx.comm_policy.expect("policy should be set");
assert!(Arc::ptr_eq(&retrieved, &new_policy));
}
#[cfg(feature = "session")]
#[test]
fn open_session_records_conforming_messages_to_transcript() {
use crate::dsl::agent_composition::check_comm_policy;
use std::time::Duration;
use symbi_runtime::session::RoleBinding;
use symbi_runtime::types::communication::MessageType;
use symbi_runtime::types::AgentId;
use symbi_session::examples::coordinator_pipeline;
let bridge = RuntimeBridge::new_permissive_for_dev();
let (g, _r) = coordinator_pipeline();
let (c, v, p) = (AgentId::new(), AgentId::new(), AgentId::new());
let rb = RoleBinding::new()
.bind(c, "Coordinator")
.bind(v, "Validator")
.bind(p, "Processor");
let _sid = bridge
.open_session(&g, rb, Duration::from_secs(60))
.unwrap();
let ctx = bridge.reasoning_context();
check_comm_policy(&ctx, c, v, MessageType::Direct(v), None).unwrap();
check_comm_policy(&ctx, v, c, MessageType::Direct(c), None).unwrap();
let t = bridge.session_transcript();
let guard = t.lock().unwrap();
assert!(
guard.len() >= 2,
"transcript should have the conforming transitions"
);
assert!(guard.verify());
}
#[cfg(feature = "session")]
#[test]
fn open_session_attaches_monitor_and_sets_active_session() {
use std::time::Duration;
use symbi_runtime::session::RoleBinding;
use symbi_runtime::types::AgentId;
use symbi_session::examples::coordinator_pipeline;
let bridge = RuntimeBridge::new_permissive_for_dev();
let (g, _r) = coordinator_pipeline();
let (c, v, p) = (AgentId::new(), AgentId::new(), AgentId::new());
let rb = RoleBinding::new()
.bind(c, "Coordinator")
.bind(v, "Validator")
.bind(p, "Processor");
let sid = bridge
.open_session(&g, rb, Duration::from_secs(60))
.unwrap();
let ctx = bridge.reasoning_context();
assert_eq!(ctx.active_session.lock().unwrap().as_ref(), Some(&sid));
assert!(ctx.session_monitor.is_some());
}
}