use crate::context::DeviceMetrics;
use crate::control_sync::{
ControlSync, ControlSyncConfig, ControlSyncHandler, ControlSyncProvider,
NoopControlSyncHandler, NoopControlSyncProvider,
};
use crate::device::ResourceMonitor;
use crate::event_bus::{EventBus, OrchestratorEvent};
use crate::executor::Executor;
use crate::orchestrator::policy_engine::DefaultPolicyEngine;
use crate::orchestrator::routing_engine::DefaultRoutingEngine;
use crate::orchestrator::{
ExecutionMode, LocalAuthority, OrchestrationAuthority, Orchestrator, OrchestratorError,
};
#[cfg(any(target_os = "macos", target_os = "ios"))]
use crate::runtime_adapter::CoreMLRuntimeAdapter;
#[cfg(target_os = "android")]
use crate::runtime_adapter::ONNXMobileRuntimeAdapter;
use crate::runtime_adapter::{OnnxRuntimeAdapter, RuntimeAdapter};
use crate::streaming::manager::StreamManager;
use crate::telemetry::{Severity, Telemetry};
use serde_json::json;
use std::path::Path;
use std::sync::Arc;
#[derive(Debug, Clone, serde::Deserialize)]
struct BootstrapConfig {
#[serde(default)]
execution_mode: Option<String>,
#[serde(default)]
adapters: Option<AdapterConfig>,
}
#[derive(Debug, Clone, serde::Deserialize)]
struct AdapterConfig {
#[serde(default = "default_true")]
local: bool,
#[serde(default = "default_true")]
cloud: bool,
#[serde(default = "default_false")]
mock: bool,
}
fn default_true() -> bool {
true
}
fn default_false() -> bool {
false
}
impl Orchestrator {
pub fn bootstrap(config_path: Option<&Path>) -> Result<Self, OrchestratorError> {
let event_bus = EventBus::new();
event_bus.publish(OrchestratorEvent::BootstrapStart {
context: Default::default(),
});
let config = if let Some(path) = config_path {
load_config(path)?
} else {
None
};
let telemetry = Arc::new(Telemetry::new());
telemetry.log_bootstrap_start();
let policy_engine = Box::new(DefaultPolicyEngine::with_default_policy());
event_bus.publish(OrchestratorEvent::ComponentInitialized {
component: "policy_engine".to_string(),
context: Default::default(),
});
let routing_engine = Box::new(DefaultRoutingEngine::new());
event_bus.publish(OrchestratorEvent::ComponentInitialized {
component: "routing_engine".to_string(),
context: Default::default(),
});
let mut executor = Executor::new();
event_bus.publish(OrchestratorEvent::ComponentInitialized {
component: "executor".to_string(),
context: Default::default(),
});
let adapter_config = config
.as_ref()
.and_then(|c| c.adapters.as_ref())
.cloned()
.unwrap_or(AdapterConfig {
local: true,
cloud: true,
mock: false,
});
if adapter_config.local {
#[cfg(any(target_os = "macos", target_os = "ios"))]
{
let adapter = Arc::new(CoreMLRuntimeAdapter::new());
executor.register_adapter(adapter);
event_bus.publish(OrchestratorEvent::AdapterRegistered {
name: "coreml".to_string(),
context: Default::default(),
});
}
#[cfg(target_os = "android")]
{
let adapter = Arc::new(ONNXMobileRuntimeAdapter::new());
executor.register_adapter(adapter);
event_bus.publish(OrchestratorEvent::AdapterRegistered {
name: "onnx-mobile".to_string(),
context: Default::default(),
});
}
#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "android")))]
{
let adapter = Arc::new(OnnxRuntimeAdapter::new());
executor.register_adapter(adapter);
event_bus.publish(OrchestratorEvent::AdapterRegistered {
name: "onnx".to_string(),
context: Default::default(),
});
}
#[cfg(any(target_os = "macos", target_os = "ios"))]
{
let adapter = Arc::new(OnnxRuntimeAdapter::new());
executor.register_adapter(adapter);
event_bus.publish(OrchestratorEvent::AdapterRegistered {
name: "onnx".to_string(),
context: Default::default(),
});
}
#[cfg(target_os = "android")]
{
let adapter = Arc::new(OnnxRuntimeAdapter::new());
executor.register_adapter(adapter);
event_bus.publish(OrchestratorEvent::AdapterRegistered {
name: "onnx".to_string(),
context: Default::default(),
});
}
}
if adapter_config.cloud {
let adapter = Arc::new(CloudRuntimeAdapter::new());
executor.register_adapter(adapter);
event_bus.publish(OrchestratorEvent::AdapterRegistered {
name: "cloud".to_string(),
context: Default::default(),
});
}
if adapter_config.mock {
let adapter = Arc::new(MockRuntimeAdapter::new());
executor.register_adapter(adapter);
event_bus.publish(OrchestratorEvent::AdapterRegistered {
name: "mock".to_string(),
context: Default::default(),
});
}
let stream_manager = StreamManager::new();
let resource_monitor = ResourceMonitor::global();
let execution_mode = config
.as_ref()
.and_then(|c| c.execution_mode.as_ref())
.map(|m| match m.as_str() {
"streaming" => ExecutionMode::Streaming,
_ => ExecutionMode::Batch,
})
.unwrap_or(ExecutionMode::Batch);
let _device_metrics = DeviceMetrics::default();
let control_sync = {
let provider: Arc<dyn ControlSyncProvider> = Arc::new(NoopControlSyncProvider);
let handler: Arc<dyn ControlSyncHandler> = Arc::new(NoopControlSyncHandler);
match ControlSync::new(
ControlSyncConfig::default(),
provider,
handler,
telemetry.clone(),
) {
Ok(manager) => Some(manager),
Err(err) => {
telemetry.log_control_sync_event(
Severity::Error,
"spawn_failed",
json!({ "error": err.to_string() }),
);
None
}
}
};
let authority: Box<dyn OrchestrationAuthority> = Box::new(LocalAuthority::new());
event_bus.publish(OrchestratorEvent::ComponentInitialized {
component: "authority".to_string(),
context: Default::default(),
});
event_bus.publish(OrchestratorEvent::ExecutorReady {
context: Default::default(),
});
event_bus.publish(OrchestratorEvent::OrchestratorReady {
context: Default::default(),
});
let orchestrator = Orchestrator::with_all(
authority,
policy_engine,
routing_engine,
executor,
stream_manager,
event_bus,
telemetry.clone(),
resource_monitor,
control_sync,
execution_mode,
);
if orchestrator.control_sync.is_some() {
orchestrator.telemetry.log_control_sync_event(
Severity::Debug,
"worker_ready",
json!({}),
);
}
orchestrator.telemetry.log_bootstrap_complete();
Ok(orchestrator)
}
}
fn load_config(path: &Path) -> Result<Option<BootstrapConfig>, OrchestratorError> {
if !path.exists() {
return Ok(None);
}
let content = std::fs::read_to_string(path).map_err(|e| {
OrchestratorError::Other(format!(
"Failed to read config file '{}': {}",
path.display(),
e
))
})?;
let config: BootstrapConfig = serde_yaml::from_str(&content).map_err(|e| {
OrchestratorError::Other(format!(
"Failed to parse config file '{}': {}",
path.display(),
e
))
})?;
Ok(Some(config))
}
struct CloudRuntimeAdapter {
}
impl CloudRuntimeAdapter {
fn new() -> Self {
Self {}
}
}
impl RuntimeAdapter for CloudRuntimeAdapter {
fn name(&self) -> &str {
"cloud"
}
fn supported_formats(&self) -> Vec<&'static str> {
vec!["onnx", "tensorflow", "pytorch"]
}
fn load_model(&mut self, _path: &str) -> crate::runtime_adapter::AdapterResult<()> {
Ok(())
}
fn execute(
&self,
input: &crate::ir::Envelope,
) -> crate::runtime_adapter::AdapterResult<crate::ir::Envelope> {
use crate::ir::EnvelopeKind;
use std::thread;
thread::sleep(std::time::Duration::from_millis(50));
let output = match &input.kind {
EnvelopeKind::Audio(_) => EnvelopeKind::Text("cloud-output-transcribed".to_string()),
EnvelopeKind::Text(t) => EnvelopeKind::Text(format!("cloud-output-{}", t)),
EnvelopeKind::Embedding(_) => EnvelopeKind::Text("cloud-output".to_string()),
};
Ok(crate::ir::Envelope::new(output))
}
}
struct MockRuntimeAdapter {
}
impl MockRuntimeAdapter {
fn new() -> Self {
Self {}
}
}
impl RuntimeAdapter for MockRuntimeAdapter {
fn name(&self) -> &str {
"mock"
}
fn supported_formats(&self) -> Vec<&'static str> {
vec!["*"] }
fn load_model(&mut self, _path: &str) -> crate::runtime_adapter::AdapterResult<()> {
Ok(())
}
fn execute(
&self,
input: &crate::ir::Envelope,
) -> crate::runtime_adapter::AdapterResult<crate::ir::Envelope> {
use crate::ir::EnvelopeKind;
let output = match &input.kind {
EnvelopeKind::Audio(_) => EnvelopeKind::Text("mock-output-transcribed".to_string()),
EnvelopeKind::Text(t) => EnvelopeKind::Text(format!("mock-output-{}", t)),
EnvelopeKind::Embedding(_) => EnvelopeKind::Text("mock-output".to_string()),
};
Ok(crate::ir::Envelope::new(output))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bootstrap_default() {
let orchestrator = Orchestrator::bootstrap(None);
assert!(orchestrator.is_ok());
let orchestrator = orchestrator.unwrap();
assert_eq!(*orchestrator.execution_mode(), ExecutionMode::Batch);
assert!(orchestrator
.executor
.list_adapters()
.contains(&"onnx".to_string()));
}
#[test]
fn test_bootstrap_with_cloud_adapter() {
use std::io::Write;
use tempfile::NamedTempFile;
let mut file = NamedTempFile::new().unwrap();
writeln!(file, "adapters:\n cloud: true").unwrap();
let path = file.path();
let orchestrator = Orchestrator::bootstrap(Some(path));
assert!(orchestrator.is_ok());
let orchestrator = orchestrator.unwrap();
let adapters = orchestrator.executor.list_adapters();
assert!(adapters.contains(&"cloud".to_string()));
}
#[test]
fn test_bootstrap_with_mock_adapter() {
use std::io::Write;
use tempfile::NamedTempFile;
let mut file = NamedTempFile::new().unwrap();
writeln!(file, "adapters:\n mock: true").unwrap();
let path = file.path();
let orchestrator = Orchestrator::bootstrap(Some(path));
assert!(orchestrator.is_ok());
let orchestrator = orchestrator.unwrap();
let adapters = orchestrator.executor.list_adapters();
assert!(adapters.contains(&"mock".to_string()));
}
}