pub mod config;
pub mod result;
pub use config::{
AudioInputConfig, AudioSampleFormat, InputConfig, InputType, OutputType as ConfigOutputType,
PipelineSource, TextInputConfig,
};
pub use result::{FfiPipelineExecutionResult, FfiStageExecutionResult};
use crate::model::SdkError;
use crate::registry_client::RegistryClient;
use crate::result::OutputType;
use crate::run_options::RunOptions;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
use std::path::Path;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
use xybrid_core::cache_provider::CacheProvider;
use xybrid_core::context::{DeviceMetrics, StageDescriptor, DEVICE_CLASS_SCHEMA_VERSION};
use xybrid_core::device::ResourceMonitor;
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
use xybrid_core::event_bus::{EventContext, OrchestratorEvent};
use xybrid_core::ir::{Envelope, EnvelopeKind};
use xybrid_core::orchestrator::routing_engine::LocalAvailability;
use xybrid_core::orchestrator::{
LocalAuthority, OrchestrationAuthority, Orchestrator, ResolvedTarget, StageContext,
StageExecutionResult,
};
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
use xybrid_core::orchestrator::{PolicyOutcome, PolicyRequest};
use xybrid_core::pipeline::{ExecutionTarget, IntegrationProvider, StageOptions};
use xybrid_core::pipeline_config::PipelineConfig;
pub type PipelineResult<T> = Result<T, SdkError>;
#[derive(Debug, Clone)]
pub struct PipelineRef {
yaml_content: String,
config: PipelineConfig,
}
impl PipelineRef {
pub fn from_yaml(yaml: &str) -> PipelineResult<Self> {
let config: PipelineConfig = serde_yaml::from_str(yaml)
.map_err(|e| SdkError::PipelineError(format!("Failed to parse YAML: {}", e)))?;
Ok(Self {
yaml_content: yaml.to_string(),
config,
})
}
pub fn from_file(path: impl Into<PathBuf>) -> PipelineResult<Self> {
let path = path.into();
let content = std::fs::read_to_string(&path)
.map_err(|e| SdkError::PipelineError(format!("Failed to read file: {}", e)))?;
Self::from_yaml(&content)
}
pub fn name(&self) -> Option<&str> {
self.config.name.as_deref()
}
pub fn stage_ids(&self) -> Vec<String> {
self.config.stages.iter().map(|s| s.stage_id()).collect()
}
pub fn stage_count(&self) -> usize {
self.config.stages.len()
}
pub fn load(&self) -> PipelineResult<Pipeline> {
Pipeline::from_ref(self)
}
pub async fn load_async(&self) -> PipelineResult<Pipeline> {
self.load()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StageInfo {
pub id: String,
pub model_id: Option<String>,
pub target: StageTarget,
pub status: StageStatus,
pub download_bytes: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum StageTarget {
Device,
Auto,
Cloud,
Integration { provider: String },
}
impl std::fmt::Display for StageTarget {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StageTarget::Device => write!(f, "device"),
StageTarget::Auto => write!(f, "auto"),
StageTarget::Cloud => write!(f, "cloud"),
StageTarget::Integration { provider } => write!(f, "integration:{}", provider),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum StageStatus {
Cached,
NeedsDownload,
Integration,
Error(String),
}
impl std::fmt::Display for StageStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StageStatus::Cached => write!(f, "cached"),
StageStatus::NeedsDownload => write!(f, "needs_download"),
StageStatus::Integration => write!(f, "integration"),
StageStatus::Error(msg) => write!(f, "error: {}", msg),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DownloadProgress {
pub model_id: String,
pub percent: u32,
pub bytes_downloaded: u64,
pub bytes_total: u64,
pub stage_index: usize,
pub total_stages: usize,
}
use xybrid_core::pipeline_config::StageConfig;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PipelineInputType {
Audio,
Text,
Embedding,
Unknown,
}
impl PipelineInputType {
pub fn is_audio(&self) -> bool {
matches!(self, PipelineInputType::Audio)
}
pub fn is_text(&self) -> bool {
matches!(self, PipelineInputType::Text)
}
}
#[derive(Debug, Clone, Serialize)]
pub struct StageTiming {
pub name: String,
pub latency_ms: u32,
pub target: String,
pub reason: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct PipelineExecutionResult {
pub name: Option<String>,
pub stages: Vec<StageTiming>,
pub total_latency_ms: u32,
pub output_type: OutputType,
pub output: Envelope,
}
impl PipelineExecutionResult {
pub fn text(&self) -> Option<&str> {
match &self.output.kind {
EnvelopeKind::Text(s) => Some(s),
_ => None,
}
}
pub fn audio_bytes(&self) -> Option<&[u8]> {
match &self.output.kind {
EnvelopeKind::Audio(bytes) => Some(bytes),
_ => None,
}
}
pub fn embedding(&self) -> Option<&[f32]> {
match &self.output.kind {
EnvelopeKind::Embedding(e) => Some(e),
_ => None,
}
}
}
fn pipeline_metrics(options: &RunOptions) -> DeviceMetrics {
options.device_metrics.as_ref().cloned().unwrap_or_default()
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
struct StreamingFastPathCacheProvider {
model_id: String,
model_path: PathBuf,
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
impl StreamingFastPathCacheProvider {
fn new(model_id: impl Into<String>, model_path: PathBuf) -> Self {
Self {
model_id: model_id.into(),
model_path,
}
}
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
impl CacheProvider for StreamingFastPathCacheProvider {
fn is_model_cached(&self, model_id: &str) -> bool {
model_id == self.model_id && self.model_path.exists()
}
fn get_model_path(&self, model_id: &str) -> Option<PathBuf> {
self.is_model_cached(model_id)
.then(|| self.model_path.clone())
}
fn cache_dir(&self) -> PathBuf {
self.model_path
.parent()
.map(PathBuf::from)
.unwrap_or_else(|| self.model_path.clone())
}
fn name(&self) -> &'static str {
"streaming-fast-path"
}
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
fn stage_descriptor_with_bundle_path(
stage_descriptor: &StageDescriptor,
bundle_path: &Path,
) -> StageDescriptor {
let mut stage_descriptor = stage_descriptor.clone();
stage_descriptor.bundle_path = Some(bundle_path.to_string_lossy().to_string());
stage_descriptor
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
#[derive(Debug, Clone)]
struct StreamingFastPathRoute {
policy_allowed: bool,
policy_reason: Option<String>,
target: String,
reason: String,
recent_abort_rate: f32,
sample_size: u32,
can_stream_locally: bool,
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
fn resolve_streaming_fast_path_route(
authority: &dyn OrchestrationAuthority,
stage: &StageDescriptor,
model_id: &str,
envelope: &Envelope,
metrics: &DeviceMetrics,
) -> StreamingFastPathRoute {
let policy_decision = authority.apply_policy(&PolicyRequest {
stage_id: stage.name.clone(),
envelope: envelope.clone(),
metrics: metrics.clone(),
});
let policy_allowed = policy_decision.result.is_allowed();
let policy_transform = matches!(policy_decision.result, PolicyOutcome::Transform { .. });
let policy_reason = Some(policy_decision.reason.clone());
let context = StageContext {
stage_id: stage.name.clone(),
model_id: model_id.to_string(),
input_kind: envelope.kind.clone(),
metrics: metrics.clone(),
resource_monitor: ResourceMonitor::global(),
explicit_target: stage.target.clone(),
local_availability: Some(LocalAvailability::new(stage.is_locally_runnable())),
device_class: Some(metrics.canonical_device_class()),
device_class_schema_version: Some(DEVICE_CLASS_SCHEMA_VERSION),
};
let resolution = authority.resolve_target_with_feedback(&context);
let target = match &resolution.decision.result {
ResolvedTarget::Device => "local".to_string(),
ResolvedTarget::Cloud { .. } => "cloud".to_string(),
ResolvedTarget::Server { endpoint } => format!("fallback:{endpoint}"),
};
let hint = resolution.local_reliability_hint.unwrap_or_default();
let can_stream_locally = policy_allowed
&& matches!(resolution.decision.result, ResolvedTarget::Device)
&& stage.is_locally_runnable()
&& !policy_transform;
StreamingFastPathRoute {
policy_allowed,
policy_reason,
target,
reason: format!(
"[{}] {} (confidence: {:.0}%)",
resolution.decision.source,
resolution.decision.reason,
resolution.decision.confidence * 100.0
),
recent_abort_rate: hint.recent_abort_rate,
sample_size: hint.sample_size,
can_stream_locally,
}
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
fn streaming_fast_path_events(
stage_name: &str,
model_id: &str,
route: &StreamingFastPathRoute,
) -> Vec<OrchestratorEvent> {
let context = EventContext::default().with_model_id(model_id.to_string());
vec![
OrchestratorEvent::PolicyEvaluated {
stage_name: stage_name.to_string(),
allowed: route.policy_allowed,
reason: route.policy_reason.clone(),
context: context.clone(),
},
OrchestratorEvent::RoutingDecided {
stage_name: stage_name.to_string(),
target: route.target.clone(),
reason: route.reason.clone(),
recent_abort_rate: route.recent_abort_rate,
sample_size: route.sample_size,
context,
},
]
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
fn publish_streaming_fast_path_events(
stage_name: &str,
model_id: &str,
route: &StreamingFastPathRoute,
pipeline_id: Option<uuid::Uuid>,
trace_id: Option<uuid::Uuid>,
) {
for event in streaming_fast_path_events(stage_name, model_id, route) {
let telemetry = crate::telemetry::convert_orchestrator_event(&event);
crate::telemetry::publish_telemetry_event_in_context(telemetry, pipeline_id, trace_id);
}
}
fn pipeline_complete_data(
stages: &[StageTiming],
output_type: &OutputType,
correlation_id: Option<&str>,
) -> String {
let stage_data: Vec<serde_json::Value> = stages
.iter()
.map(|stage| {
serde_json::json!({
"name": stage.name,
"latency_ms": stage.latency_ms,
"target": stage.target,
})
})
.collect();
let mut data = serde_json::json!({
"stages": stage_data,
"output_type": format!("{:?}", output_type),
});
if let Some(correlation_id) = correlation_id {
data["correlation_id"] = serde_json::json!(correlation_id);
}
data.to_string()
}
struct PipelineHandle {
stage_descriptors: Vec<StageDescriptor>,
availability_map: HashMap<String, bool>,
registry_url: Option<String>,
stage_configs: Vec<StageConfig>,
bundle_paths: HashMap<String, PathBuf>,
}
pub struct Pipeline {
name: Option<String>,
handle: Arc<RwLock<PipelineHandle>>,
stages: Vec<StageInfo>,
total_download_bytes: u64,
}
impl Pipeline {
fn from_ref(ref_: &PipelineRef) -> PipelineResult<Self> {
let config = ref_.config.clone();
let stage_descriptors: Vec<StageDescriptor> = config
.stages
.iter()
.map(|stage_config| {
let name = stage_config.model_id();
let mut desc = StageDescriptor::new(name);
if let Some(target_str) = stage_config.target() {
desc.target = Self::parse_target(target_str);
}
if let Some(provider_str) = stage_config.provider() {
desc.provider = Self::parse_provider(provider_str);
if desc.target.is_none() {
desc.target = Some(ExecutionTarget::Cloud);
}
}
desc.model = Some(stage_config.model_id());
let opts = stage_config.options();
if !opts.is_empty() {
desc.options = Some(Self::convert_options(&opts));
}
desc
})
.collect();
let registry_url = config.registry.clone();
let stage_configs = config.stages.clone();
let mut availability_map = HashMap::new();
let client = if let Some(ref url) = registry_url {
RegistryClient::with_url(url.clone()).ok()
} else {
RegistryClient::from_env().ok()
};
if let Some(ref client) = client {
for stage_config in &stage_configs {
let model_id = stage_config.model_id();
let is_cached = client.is_cached(&model_id, None).unwrap_or(false);
availability_map.insert(model_id, is_cached);
}
}
let handle = PipelineHandle {
stage_descriptors,
availability_map,
registry_url,
stage_configs,
bundle_paths: HashMap::new(),
};
let handle = Arc::new(RwLock::new(handle));
let (stages, total_download_bytes) = Self::resolve_stages(&handle, &config)?;
Ok(Self {
name: config.name,
handle,
stages,
total_download_bytes,
})
}
fn parse_target(target: &str) -> Option<ExecutionTarget> {
match target.to_lowercase().as_str() {
"device" | "local" => Some(ExecutionTarget::Device),
"server" => Some(ExecutionTarget::Server),
"cloud" | "integration" | "api" => Some(ExecutionTarget::Cloud),
"auto" => Some(ExecutionTarget::Auto),
_ => None,
}
}
fn parse_provider(provider: &str) -> Option<IntegrationProvider> {
match provider.to_lowercase().as_str() {
"openai" => Some(IntegrationProvider::OpenAI),
"anthropic" | "claude" => Some(IntegrationProvider::Anthropic),
"google" | "gemini" => Some(IntegrationProvider::Google),
"elevenlabs" | "eleven" | "eleven_labs" => Some(IntegrationProvider::ElevenLabs),
"openrouter" | "open_router" => Some(IntegrationProvider::OpenRouter),
"deepseek" | "deep_seek" => Some(IntegrationProvider::DeepSeek),
_ => Some(IntegrationProvider::Custom),
}
}
fn convert_options(options: &HashMap<String, serde_json::Value>) -> StageOptions {
let mut stage_options = StageOptions::new();
for (key, value) in options {
match value {
serde_json::Value::Number(n) => {
if let Some(f) = n.as_f64() {
stage_options.set(key, f);
} else if let Some(i) = n.as_u64() {
stage_options.set(key, i as u32);
}
}
serde_json::Value::String(s) => {
stage_options.set(key, s.clone());
}
serde_json::Value::Bool(b) => {
stage_options.set(key, *b);
}
_ => {}
}
}
stage_options
}
fn resolve_stages(
handle: &Arc<RwLock<PipelineHandle>>,
config: &PipelineConfig,
) -> PipelineResult<(Vec<StageInfo>, u64)> {
let registry_url = config.registry.clone();
let client = if let Some(url) = registry_url {
RegistryClient::with_url(url)?
} else {
RegistryClient::from_env()?
};
let mut stages = Vec::new();
let mut total_download_bytes: u64 = 0;
let handle_read = handle
.read()
.map_err(|_| SdkError::PipelineError("Failed to read pipeline handle".to_string()))?;
for stage_config in &config.stages {
let stage_id = stage_config.stage_id();
let target_str = stage_config.target();
let provider = stage_config.provider();
let model_name = Some(stage_config.model_id());
let stage_target = if provider.is_some() || target_str == Some("integration") {
StageTarget::Integration {
provider: provider.unwrap_or("unknown").to_string(),
}
} else if target_str == Some("cloud") || target_str == Some("server") {
StageTarget::Cloud
} else if target_str == Some("device")
|| target_str == Some("local")
|| target_str == Some("edge")
{
StageTarget::Device } else {
StageTarget::Auto };
let (status, download_bytes) =
if matches!(stage_target, StageTarget::Device | StageTarget::Auto) {
let model_id = stage_config.model_id();
match client.resolve(&model_id, None) {
Ok(resolved) => {
let is_cached = client.is_cached(&model_id, None).unwrap_or(false);
if is_cached {
(StageStatus::Cached, None)
} else {
total_download_bytes += resolved.size_bytes;
(StageStatus::NeedsDownload, Some(resolved.size_bytes))
}
}
Err(e) => {
if handle_read
.availability_map
.get(&model_id)
.copied()
.unwrap_or(false)
{
(StageStatus::Cached, None)
} else {
(StageStatus::Error(e.to_string()), None)
}
}
}
} else {
(StageStatus::Integration, None)
};
stages.push(StageInfo {
id: stage_id,
model_id: model_name,
target: stage_target,
status,
download_bytes,
});
}
Ok((stages, total_download_bytes))
}
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
pub fn stage_names(&self) -> Vec<String> {
self.stages.iter().map(|s| s.id.clone()).collect()
}
pub fn stages(&self) -> &[StageInfo] {
&self.stages
}
pub fn stage_count(&self) -> usize {
self.stages.len()
}
pub fn input_type(&self) -> PipelineInputType {
PipelineInputType::Unknown
}
pub fn is_ready(&self) -> bool {
let Ok(handle) = self.handle.read() else {
return false;
};
self.stages.iter().all(|s| match s.status {
StageStatus::Cached => handle.bundle_paths.contains_key(&s.id),
StageStatus::Integration => true,
_ => false,
})
}
pub fn download_size(&self) -> u64 {
self.total_download_bytes
}
pub fn stages_needing_download(&self) -> Vec<&StageInfo> {
self.stages
.iter()
.filter(|s| matches!(s.status, StageStatus::NeedsDownload))
.collect()
}
pub fn load_models(&self) -> PipelineResult<()> {
self.load_models_with_progress(|_| {})
}
pub fn load_models_with_progress<F>(&self, progress_callback: F) -> PipelineResult<()>
where
F: Fn(DownloadProgress),
{
let registry_url = self
.handle
.read()
.map_err(|_| SdkError::PipelineError("Failed to read handle".to_string()))?
.registry_url
.clone();
let client = if let Some(url) = registry_url {
RegistryClient::with_url(url)?
} else {
RegistryClient::from_env()?
};
let authority = LocalAuthority::new();
let metrics = DeviceMetrics::default();
let stages_to_fetch: Vec<_> = self
.stages
.iter()
.enumerate()
.filter(|(_, s)| matches!(s.status, StageStatus::Cached | StageStatus::NeedsDownload))
.filter_map(|(idx, s)| {
s.model_id.as_ref().map(|m| {
(
idx,
s.id.clone(),
m.clone(),
s.download_bytes.unwrap_or(0),
s.target.clone(),
s.status.clone(),
)
})
})
.collect();
let total_stages = stages_to_fetch
.iter()
.filter(|(_, _, _, _, _, status)| matches!(status, StageStatus::NeedsDownload))
.count();
let mut skipped_count = 0;
let mut download_stage_idx = 0;
for (_, stage_id, model_id, total_bytes, stage_target, stage_status) in stages_to_fetch {
if matches!(stage_status, StageStatus::Cached) {
let model_dir = client.fetch_extracted(&model_id, None, |_| {})?;
let mut handle = self.handle.write().unwrap_or_else(|e| e.into_inner());
handle.availability_map.insert(model_id.clone(), true);
handle.availability_map.insert(stage_id.clone(), true);
handle
.bundle_paths
.insert(stage_id.clone(), model_dir.clone());
handle.bundle_paths.insert(model_id, model_dir);
continue;
}
let stage_idx = download_stage_idx;
download_stage_idx += 1;
let explicit_target = match &stage_target {
StageTarget::Device => Some(ExecutionTarget::Device),
StageTarget::Auto => None, StageTarget::Cloud => Some(ExecutionTarget::Cloud),
StageTarget::Integration { .. } => Some(ExecutionTarget::Cloud),
};
let stage_context = StageContext {
stage_id: stage_id.clone(),
model_id: model_id.clone(),
input_kind: EnvelopeKind::Text("".to_string()), metrics: metrics.clone(),
resource_monitor: ResourceMonitor::global(),
explicit_target,
local_availability: None,
device_class: Some(metrics.canonical_device_class()),
device_class_schema_version: Some(DEVICE_CLASS_SCHEMA_VERSION),
};
let decision = authority.resolve_target(&stage_context);
match decision.result {
ResolvedTarget::Device => {
let progress_for_model = |download_progress: f32| {
let bytes_downloaded = (download_progress * total_bytes as f32) as u64;
progress_callback(DownloadProgress {
model_id: model_id.clone(),
percent: (download_progress * 100.0) as u32,
bytes_downloaded,
bytes_total: total_bytes,
stage_index: stage_idx,
total_stages,
});
};
let model_dir = client.fetch_extracted(&model_id, None, progress_for_model)?;
{
let mut handle = self.handle.write().unwrap_or_else(|e| e.into_inner());
handle.availability_map.insert(model_id.clone(), true);
handle.availability_map.insert(stage_id.clone(), true);
handle
.bundle_paths
.insert(stage_id.clone(), model_dir.clone());
handle.bundle_paths.insert(model_id.clone(), model_dir);
}
}
ResolvedTarget::Cloud { .. } | ResolvedTarget::Server { .. } => {
skipped_count += 1;
#[cfg(debug_assertions)]
eprintln!(
"[pipeline] Skipping download for '{}': authority routed to {:?} ({})",
model_id, decision.result, decision.reason
);
}
}
}
if skipped_count > 0 {
#[cfg(debug_assertions)]
eprintln!(
"[pipeline] Skipped {} downloads based on authority routing decisions",
skipped_count
);
}
Ok(())
}
pub fn warmup(&self) -> PipelineResult<()> {
log::info!(target: "xybrid_sdk", "Warming up pipeline: {:?}", self.name);
if !self.is_ready() {
self.load_models()?;
}
let warmup_input = Envelope {
kind: EnvelopeKind::Text("Hi".to_string()),
metadata: std::collections::HashMap::new(),
};
let start = std::time::Instant::now();
let _ = self.run(&warmup_input)?;
let elapsed = start.elapsed();
log::info!(
target: "xybrid_sdk",
"Pipeline {:?} warmed up in {:?}",
self.name,
elapsed
);
Ok(())
}
pub async fn warmup_async(&self) -> PipelineResult<()> {
log::info!(target: "xybrid_sdk", "Warming up pipeline (async): {:?}", self.name);
if !self.is_ready() {
self.load_models()?;
}
let warmup_input = Envelope {
kind: EnvelopeKind::Text("Hi".to_string()),
metadata: std::collections::HashMap::new(),
};
let start = std::time::Instant::now();
let _ = self.run_async(&warmup_input).await?;
let elapsed = start.elapsed();
log::info!(
target: "xybrid_sdk",
"Pipeline {:?} warmed up (async) in {:?}",
self.name,
elapsed
);
Ok(())
}
pub fn run(&self, envelope: &Envelope) -> PipelineResult<PipelineExecutionResult> {
self.run_with_options(envelope, &RunOptions::default())
}
pub fn run_with_options(
&self,
envelope: &Envelope,
options: &RunOptions,
) -> PipelineResult<PipelineExecutionResult> {
if !self.is_ready() {
self.load_models()?;
}
let handle = self
.handle
.read()
.map_err(|_| SdkError::PipelineError("Failed to acquire pipeline lock".to_string()))?;
let mut stage_descriptors = handle.stage_descriptors.clone();
for desc in &mut stage_descriptors {
if let Some(bundle_path) = handle.bundle_paths.get(&desc.name) {
desc.bundle_path = Some(bundle_path.to_string_lossy().to_string());
}
}
let availability_map: HashMap<String, bool> = stage_descriptors
.iter()
.map(|stage| (stage.name.clone(), stage.is_locally_runnable()))
.collect();
drop(handle);
let metrics = pipeline_metrics(options);
let trace_id = uuid::Uuid::new_v4();
let pipeline_id = self
.name
.as_ref()
.map(|n| uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, n.as_bytes()));
let _context_guard =
crate::telemetry::TelemetryPipelineContextGuard::install(pipeline_id, Some(trace_id));
let mut orchestrator = Orchestrator::new();
let bridge = crate::telemetry::bridge_orchestrator_events(&orchestrator);
let availability_fn = move |stage: &str| -> LocalAvailability {
let exists = availability_map.get(stage).copied().unwrap_or(false);
LocalAvailability::new(exists)
};
let start_time = std::time::Instant::now();
let resource_guard = crate::telemetry::begin_resource_run();
let execution_result =
orchestrator.execute_pipeline(&stage_descriptors, envelope, &metrics, &availability_fn);
drop(orchestrator);
bridge.join().map_err(|e| {
SdkError::PipelineError(format!("Orchestrator event bridge failed: {}", e))
})?;
let results: Vec<StageExecutionResult> = execution_result
.map_err(|e| SdkError::PipelineError(format!("Pipeline execution failed: {}", e)))?;
let total_latency_ms = start_time.elapsed().as_millis() as u32;
let stages: Vec<StageTiming> = results
.iter()
.map(|result| StageTiming {
name: result.stage.clone(),
latency_ms: result.latency_ms,
target: result.routing_decision.target.to_string(),
reason: result.routing_decision.reason.clone(),
})
.collect();
let (output_type, output) = if let Some(last) = results.last() {
let output_type = match &last.output.kind {
EnvelopeKind::Text(_) => OutputType::Text,
EnvelopeKind::Audio(_) => OutputType::Audio,
EnvelopeKind::Embedding(_) => OutputType::Embedding,
};
(output_type, last.output.clone())
} else {
(
OutputType::Unknown,
Envelope::new(EnvelopeKind::Text(String::new())),
)
};
let (event_stage_name, event_target) = if results.len() == 1 {
let only = &results[0];
(
Some(only.stage.clone()),
Some(only.routing_decision.target.to_string()),
)
} else {
(self.name.clone(), None)
};
let event = crate::telemetry::TelemetryEvent {
event_type: "PipelineComplete".to_string(),
stage_name: event_stage_name,
target: event_target,
latency_ms: Some(total_latency_ms),
error: None,
data: Some(pipeline_complete_data(
&stages,
&output_type,
options.correlation_id.as_deref(),
)),
timestamp_ms: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
};
crate::telemetry::publish_with_resource_summary_in_context(
event,
resource_guard,
pipeline_id,
Some(trace_id),
);
Ok(PipelineExecutionResult {
name: self.name.clone(),
stages,
total_latency_ms,
output_type,
output,
})
}
pub async fn run_async(&self, envelope: &Envelope) -> PipelineResult<PipelineExecutionResult> {
self.run_async_with_options(envelope, &RunOptions::default())
.await
}
pub async fn run_async_with_options(
&self,
envelope: &Envelope,
options: &RunOptions,
) -> PipelineResult<PipelineExecutionResult> {
if !self.is_ready() {
self.load_models()?;
}
let (stage_descriptors, availability_map) = {
let handle = self.handle.read().unwrap_or_else(|e| e.into_inner());
let mut descriptors = handle.stage_descriptors.clone();
for desc in &mut descriptors {
if let Some(bundle_path) = handle.bundle_paths.get(&desc.name) {
desc.bundle_path = Some(bundle_path.to_string_lossy().to_string());
}
}
let availability_map: HashMap<String, bool> = descriptors
.iter()
.map(|stage| (stage.name.clone(), stage.is_locally_runnable()))
.collect();
(descriptors, availability_map)
};
let envelope_clone = envelope.clone();
let name = self.name.clone();
let options = options.clone();
tokio::task::spawn_blocking(move || {
let metrics = pipeline_metrics(&options);
let trace_id = uuid::Uuid::new_v4();
let pipeline_id = name
.as_ref()
.map(|n| uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, n.as_bytes()));
let _context_guard = crate::telemetry::TelemetryPipelineContextGuard::install(
pipeline_id,
Some(trace_id),
);
let mut orchestrator = Orchestrator::new();
let bridge = crate::telemetry::bridge_orchestrator_events(&orchestrator);
let availability_fn = move |stage: &str| -> LocalAvailability {
let exists = availability_map.get(stage).copied().unwrap_or(false);
LocalAvailability::new(exists)
};
let start_time = std::time::Instant::now();
let resource_guard = crate::telemetry::begin_resource_run();
let execution_result = orchestrator.execute_pipeline(
&stage_descriptors,
&envelope_clone,
&metrics,
&availability_fn,
);
drop(orchestrator);
bridge.join().map_err(|e| {
SdkError::PipelineError(format!("Orchestrator event bridge failed: {}", e))
})?;
let results: Vec<StageExecutionResult> = execution_result.map_err(|e| {
SdkError::PipelineError(format!("Pipeline execution failed: {}", e))
})?;
let total_latency_ms = start_time.elapsed().as_millis() as u32;
let stages: Vec<StageTiming> = results
.iter()
.map(|result| StageTiming {
name: result.stage.clone(),
latency_ms: result.latency_ms,
target: result.routing_decision.target.to_string(),
reason: result.routing_decision.reason.clone(),
})
.collect();
let (output_type, output) = if let Some(last) = results.last() {
let output_type = match &last.output.kind {
EnvelopeKind::Text(_) => OutputType::Text,
EnvelopeKind::Audio(_) => OutputType::Audio,
EnvelopeKind::Embedding(_) => OutputType::Embedding,
};
(output_type, last.output.clone())
} else {
(
OutputType::Unknown,
Envelope::new(EnvelopeKind::Text(String::new())),
)
};
let (event_stage_name, event_target) = if results.len() == 1 {
let only = &results[0];
(
Some(only.stage.clone()),
Some(only.routing_decision.target.to_string()),
)
} else {
(name.clone(), None)
};
let event = crate::telemetry::TelemetryEvent {
event_type: "PipelineComplete".to_string(),
stage_name: event_stage_name,
target: event_target,
latency_ms: Some(total_latency_ms),
error: None,
data: Some(pipeline_complete_data(
&stages,
&output_type,
options.correlation_id.as_deref(),
)),
timestamp_ms: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
};
crate::telemetry::publish_with_resource_summary_in_context(
event,
resource_guard,
pipeline_id,
Some(trace_id),
);
Ok(PipelineExecutionResult {
name,
stages,
total_latency_ms,
output_type,
output,
})
})
.await
.map_err(|e| SdkError::PipelineError(format!("Task join error: {}", e)))?
}
}
impl Clone for Pipeline {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
handle: self.handle.clone(),
stages: self.stages.clone(),
total_download_bytes: self.total_download_bytes,
}
}
}
pub struct Xybrid;
impl Xybrid {
pub fn run_pipeline(
yaml: &str,
envelope: &Envelope,
) -> PipelineResult<PipelineExecutionResult> {
Self::run_pipeline_with_options(yaml, envelope, &RunOptions::default())
}
pub fn run_pipeline_with_options(
yaml: &str,
envelope: &Envelope,
options: &RunOptions,
) -> PipelineResult<PipelineExecutionResult> {
let pipeline = PipelineRef::from_yaml(yaml)?.load()?;
pipeline.run_with_options(envelope, options)
}
pub fn pipeline(yaml: &str) -> PipelineResult<PipelineRef> {
PipelineRef::from_yaml(yaml)
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
pub fn run_pipeline_streaming<'a>(
yaml: &str,
envelope: &Envelope,
on_token: xybrid_core::runtime_adapter::types::StreamingCallback<'a>,
) -> PipelineResult<PipelineExecutionResult> {
Self::run_pipeline_streaming_with_options(yaml, envelope, &RunOptions::default(), on_token)
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
pub fn run_pipeline_streaming_with_options<'a>(
yaml: &str,
envelope: &Envelope,
options: &RunOptions,
on_token: xybrid_core::runtime_adapter::types::StreamingCallback<'a>,
) -> PipelineResult<PipelineExecutionResult> {
use xybrid_core::execution::{ModelMetadata, TemplateExecutor};
let pipeline_ref = PipelineRef::from_yaml(yaml)?;
let pipeline = pipeline_ref.load()?;
if !pipeline.is_ready() {
pipeline.load_models()?;
}
let handle = pipeline
.handle
.read()
.map_err(|_| SdkError::PipelineError("Failed to acquire pipeline lock".to_string()))?;
if handle.stage_descriptors.len() == 1 {
let stage_name = handle.stage_descriptors[0].name.clone();
if let Some(bundle_path) = handle.bundle_paths.get(&stage_name) {
let bundle_path = bundle_path.clone(); let stage_descriptor =
stage_descriptor_with_bundle_path(&handle.stage_descriptors[0], &bundle_path);
let metadata_path = bundle_path.join("model_metadata.json");
if metadata_path.exists() {
let metadata_str = std::fs::read_to_string(&metadata_path).map_err(|e| {
SdkError::PipelineError(format!("Failed to read metadata: {}", e))
})?;
let metadata: ModelMetadata =
serde_json::from_str(&metadata_str).map_err(|e| {
SdkError::PipelineError(format!("Failed to parse metadata: {}", e))
})?;
if matches!(
metadata.execution_template,
xybrid_core::execution::ExecutionTemplate::Gguf { .. }
) {
let model_id = metadata.model_id.clone();
let metrics = pipeline_metrics(options);
let authority = LocalAuthority::with_cache_provider(Arc::new(
StreamingFastPathCacheProvider::new(
model_id.clone(),
bundle_path.clone(),
),
));
let route = resolve_streaming_fast_path_route(
&authority,
&stage_descriptor,
&model_id,
envelope,
&metrics,
);
if !route.can_stream_locally {
drop(handle);
return pipeline.run_with_options(envelope, options);
}
drop(handle);
let trace_id = uuid::Uuid::new_v4();
let pipeline_id = pipeline
.name
.as_ref()
.map(|n| uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, n.as_bytes()));
let _context_guard =
crate::telemetry::TelemetryPipelineContextGuard::install(
pipeline_id,
Some(trace_id),
);
publish_streaming_fast_path_events(
&stage_name,
&model_id,
&route,
pipeline_id,
Some(trace_id),
);
let mut executor =
TemplateExecutor::with_base_path(bundle_path.to_str().unwrap_or(""));
let start_time = std::time::Instant::now();
let output = executor
.execute_streaming(&metadata, envelope, on_token, None)
.map_err(|e| SdkError::InferenceError(format!("{}", e)))?;
let total_latency_ms = start_time.elapsed().as_millis() as u32;
let output_type = match &output.kind {
EnvelopeKind::Text(_) => OutputType::Text,
EnvelopeKind::Audio(_) => OutputType::Audio,
EnvelopeKind::Embedding(_) => OutputType::Embedding,
};
let event = crate::telemetry::TelemetryEvent {
event_type: "ModelComplete".to_string(),
stage_name: Some(model_id.clone()),
target: Some(route.target.clone()),
latency_ms: Some(total_latency_ms),
error: None,
data: Some(
serde_json::json!({
"model_id": model_id,
"version": metadata.version,
"output_type": format!("{:?}", output_type),
"streaming": true,
})
.to_string(),
),
timestamp_ms: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
};
crate::telemetry::publish_telemetry_event_in_context(
event,
pipeline_id,
Some(trace_id),
);
return Ok(PipelineExecutionResult {
name: pipeline.name.clone(),
stages: vec![StageTiming {
name: pipeline_ref.config.stages[0].model_id(),
latency_ms: total_latency_ms,
target: route.target,
reason: route.reason,
}],
total_latency_ms,
output_type,
output,
});
}
}
}
}
drop(handle);
pipeline.run_with_options(envelope, options)
}
#[cfg(not(any(feature = "llm-mistral", feature = "llm-llamacpp")))]
#[allow(unused_variables)]
pub fn run_pipeline_streaming<'a>(
yaml: &str,
envelope: &Envelope,
on_token: xybrid_core::runtime_adapter::types::StreamingCallback<'a>,
) -> PipelineResult<PipelineExecutionResult> {
Self::run_pipeline(yaml, envelope)
}
#[cfg(not(any(feature = "llm-mistral", feature = "llm-llamacpp")))]
#[allow(unused_variables)]
pub fn run_pipeline_streaming_with_options<'a>(
yaml: &str,
envelope: &Envelope,
options: &RunOptions,
on_token: xybrid_core::runtime_adapter::types::StreamingCallback<'a>,
) -> PipelineResult<PipelineExecutionResult> {
Self::run_pipeline_with_options(yaml, envelope, options)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_ref_from_yaml() {
let yaml = r#"
name: "Test Pipeline"
stages:
- test-stage@1.0
"#;
let ref_ = PipelineRef::from_yaml(yaml).unwrap();
assert_eq!(ref_.name(), Some("Test Pipeline"));
assert_eq!(ref_.stage_count(), 1);
}
#[test]
fn pipeline_complete_data_includes_correlation_id_when_options_set() {
let stages = vec![StageTiming {
name: "llm".to_string(),
latency_ms: 12,
target: "local".to_string(),
reason: "local_available".to_string(),
}];
let json = pipeline_complete_data(&stages, &OutputType::Text, Some("run-abc"));
let value: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(value["correlation_id"], "run-abc");
assert_eq!(value["output_type"], "Text");
assert_eq!(value["stages"][0]["name"], "llm");
assert_eq!(value["stages"][0]["latency_ms"], 12);
assert_eq!(value["stages"][0]["target"], "local");
}
#[test]
fn pipeline_complete_data_omits_correlation_id_when_options_unset() {
let stages = vec![StageTiming {
name: "asr".to_string(),
latency_ms: 7,
target: "device".to_string(),
reason: "local_available".to_string(),
}];
let json = pipeline_complete_data(&stages, &OutputType::Audio, None);
let value: serde_json::Value = serde_json::from_str(&json).unwrap();
assert!(value.get("correlation_id").is_none());
assert_eq!(value["output_type"], "Audio");
assert_eq!(value["stages"][0]["target"], "device");
}
#[test]
fn xybrid_run_pipeline_with_options_is_public_api() {
let _method: fn(&str, &Envelope, &RunOptions) -> PipelineResult<PipelineExecutionResult> =
Xybrid::run_pipeline_with_options;
}
#[test]
fn xybrid_run_pipeline_streaming_with_options_is_public_api() {
let _method: for<'a> fn(
&str,
&Envelope,
&RunOptions,
xybrid_core::runtime_adapter::types::StreamingCallback<'a>,
) -> PipelineResult<PipelineExecutionResult> = Xybrid::run_pipeline_streaming_with_options;
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
#[test]
fn streaming_fast_path_route_uses_policy_and_local_routing() {
let tempdir = tempfile::tempdir().unwrap();
let model_id = "streaming-local-model";
let authority = LocalAuthority::with_cache_provider(Arc::new(
StreamingFastPathCacheProvider::new(model_id, tempdir.path().to_path_buf()),
));
let stage = StageDescriptor::new("llm")
.with_model(model_id)
.with_bundle_path(tempdir.path().to_string_lossy().to_string())
.with_target(ExecutionTarget::Device);
let envelope = Envelope::new(EnvelopeKind::Text("prompt".to_string()));
let metrics = DeviceMetrics::default();
let route =
resolve_streaming_fast_path_route(&authority, &stage, model_id, &envelope, &metrics);
assert!(route.policy_allowed);
assert!(route.can_stream_locally);
assert_eq!(route.target, "local");
assert_eq!(route.sample_size, 0);
assert!(route.reason.contains("Explicit target"));
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
#[test]
fn streaming_fast_path_descriptor_uses_loaded_bundle_path() {
let tempdir = tempfile::tempdir().unwrap();
let stage = StageDescriptor::new("llm")
.with_model("streaming-local-model")
.with_target(ExecutionTarget::Device);
assert!(!stage.is_locally_runnable());
let stage = stage_descriptor_with_bundle_path(&stage, tempdir.path());
assert!(stage.is_locally_runnable());
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
#[test]
fn streaming_fast_path_network_target_disables_local_streaming() {
let tempdir = tempfile::tempdir().unwrap();
let model_id = "streaming-cloud-model";
let authority = LocalAuthority::with_cache_provider(Arc::new(
StreamingFastPathCacheProvider::new(model_id, tempdir.path().to_path_buf()),
));
let stage = StageDescriptor::new("llm")
.with_model(model_id)
.with_bundle_path(tempdir.path().to_string_lossy().to_string())
.with_target(ExecutionTarget::Cloud);
let envelope = Envelope::new(EnvelopeKind::Text("prompt".to_string()));
let metrics = DeviceMetrics::default();
let route =
resolve_streaming_fast_path_route(&authority, &stage, model_id, &envelope, &metrics);
assert!(!route.can_stream_locally);
assert_eq!(route.target, "cloud");
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
#[test]
fn streaming_fast_path_policy_deny_disables_local_streaming() {
use xybrid_core::orchestrator::policy_engine::{DefaultPolicyEngine, PolicyEngine};
let tempdir = tempfile::tempdir().unwrap();
let model_id = "streaming-denied-model";
let mut policy_engine = DefaultPolicyEngine::new();
policy_engine
.load_policies(
br#"
version: "0.1.0"
deny_cloud_if:
- input.kind == "text"
signature: "test-deny-text"
"#
.to_vec(),
)
.expect("policy loads");
let authority = LocalAuthority::with_policy_and_cache(
policy_engine,
Arc::new(StreamingFastPathCacheProvider::new(
model_id,
tempdir.path().to_path_buf(),
)),
);
let stage = StageDescriptor::new("llm")
.with_model(model_id)
.with_target(ExecutionTarget::Device);
let envelope = Envelope::new(EnvelopeKind::Text("prompt".to_string()));
let metrics = DeviceMetrics::default();
let route =
resolve_streaming_fast_path_route(&authority, &stage, model_id, &envelope, &metrics);
assert!(!route.policy_allowed);
assert!(
!route.can_stream_locally,
"streaming fast path must not execute when policy denies the prompt"
);
}
#[cfg(any(feature = "llm-mistral", feature = "llm-llamacpp"))]
#[test]
fn streaming_fast_path_events_emit_policy_before_routing_with_hint() {
let route = StreamingFastPathRoute {
policy_allowed: true,
policy_reason: Some("Local policy evaluation".to_string()),
target: "local".to_string(),
reason: "[local] default_local (confidence: 80%)".to_string(),
recent_abort_rate: 0.25,
sample_size: 4,
can_stream_locally: true,
};
let events = streaming_fast_path_events("llm", "qwen2.5-0.5b", &route);
assert_eq!(events.len(), 2);
assert!(matches!(
events[0],
OrchestratorEvent::PolicyEvaluated { .. }
));
assert!(matches!(
events[1],
OrchestratorEvent::RoutingDecided { .. }
));
let telemetry = crate::telemetry::convert_orchestrator_event(&events[1]);
assert_eq!(telemetry.event_type, "RoutingDecided");
assert_eq!(telemetry.stage_name.as_deref(), Some("llm"));
assert_eq!(telemetry.target.as_deref(), Some("local"));
let data: serde_json::Value =
serde_json::from_str(telemetry.data.as_ref().expect("routing data")).unwrap();
assert_eq!(
data["local_reliability_hint"]["recent_abort_rate"].as_f64(),
Some(0.25)
);
assert_eq!(
data["local_reliability_hint"]["sample_size"].as_i64(),
Some(4)
);
assert_eq!(data["model_id"].as_str(), Some("qwen2.5-0.5b"));
}
#[test]
fn test_pipeline_ref_stage_ids() {
let yaml = r#"
name: "Multi-Stage"
stages:
- id: asr
model: wav2vec2-base-960h
version: "1.0"
- id: llm
model: gpt-4o-mini
provider: openai
- id: tts
model: kokoro-82m
"#;
let ref_ = PipelineRef::from_yaml(yaml).unwrap();
assert_eq!(ref_.stage_ids(), vec!["asr", "llm", "tts"]);
assert_eq!(ref_.stage_count(), 3);
}
#[test]
fn test_stage_target_display() {
assert_eq!(StageTarget::Device.to_string(), "device");
assert_eq!(StageTarget::Auto.to_string(), "auto");
assert_eq!(StageTarget::Cloud.to_string(), "cloud");
assert_eq!(
StageTarget::Integration {
provider: "openai".to_string()
}
.to_string(),
"integration:openai"
);
}
#[test]
fn parse_provider_accepts_deepseek() {
assert_eq!(
Pipeline::parse_provider("deepseek"),
Some(IntegrationProvider::DeepSeek)
);
}
#[test]
fn test_stage_status_display() {
assert_eq!(StageStatus::Cached.to_string(), "cached");
assert_eq!(StageStatus::NeedsDownload.to_string(), "needs_download");
assert_eq!(StageStatus::Integration.to_string(), "integration");
assert_eq!(
StageStatus::Error("failed".to_string()).to_string(),
"error: failed"
);
}
#[test]
fn test_download_progress_serialization() {
let progress = DownloadProgress {
model_id: "kokoro-82m".to_string(),
percent: 75,
bytes_downloaded: 150_000_000,
bytes_total: 200_000_000,
stage_index: 1,
total_stages: 2,
};
let json = serde_json::to_string(&progress).unwrap();
assert!(json.contains("\"model_id\":\"kokoro-82m\""));
assert!(json.contains("\"percent\":75"));
}
#[test]
fn test_stage_info_serialization() {
let info = StageInfo {
id: "asr".to_string(),
model_id: Some("wav2vec2".to_string()),
target: StageTarget::Device,
status: StageStatus::Cached,
download_bytes: None,
};
let json = serde_json::to_string(&info).unwrap();
assert!(json.contains("\"id\":\"asr\""));
assert!(json.contains("\"status\":\"Cached\""));
}
#[test]
fn test_xybrid_pipeline_convenience() {
let yaml = r#"
name: "Test"
stages:
- test-stage@1.0
"#;
let ref_ = Xybrid::pipeline(yaml).unwrap();
assert_eq!(ref_.name(), Some("Test"));
}
#[test]
fn test_pipeline_input_type_methods() {
assert!(PipelineInputType::Audio.is_audio());
assert!(!PipelineInputType::Audio.is_text());
assert!(PipelineInputType::Text.is_text());
assert!(!PipelineInputType::Text.is_audio());
}
#[test]
fn test_stage_config_model_id() {
let stage: StageConfig = serde_yaml::from_str(r#""kokoro-82m""#).unwrap();
assert_eq!(stage.model_id(), "kokoro-82m");
let stage: StageConfig = serde_yaml::from_str(r#""wav2vec2@1.0""#).unwrap();
assert_eq!(stage.model_id(), "wav2vec2");
let stage: StageConfig = serde_yaml::from_str(
r#"
model: gpt-4o-mini
"#,
)
.unwrap();
assert_eq!(stage.model_id(), "gpt-4o-mini");
let stage: StageConfig = serde_yaml::from_str(
r#"
id: asr
"#,
)
.unwrap();
assert_eq!(stage.stage_id(), "asr");
}
}