#![allow(dead_code)]
#![allow(
clippy::too_many_arguments,
clippy::ptr_arg,
clippy::map_identity,
clippy::while_let_loop
)]
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use tokio::sync::mpsc;
use xybrid_core::context::{DeviceMetrics, StageDescriptor};
use xybrid_core::ir::{Envelope, EnvelopeKind};
use xybrid_core::orchestrator::routing_engine::LocalAvailability;
use xybrid_core::orchestrator::{Orchestrator, StageExecutionResult};
pub mod benchmark;
pub mod cache;
pub mod device;
pub mod llm;
pub mod metadata_gen;
pub mod model;
pub mod pipeline;
pub mod platform;
pub mod registry_client;
pub mod result;
pub mod run_options;
pub mod source;
pub mod stream;
pub mod streaming;
pub mod telemetry;
pub mod telemetry_optout;
pub use xybrid_core::bundler;
pub use xybrid_core::cache_provider::CacheProvider;
pub use xybrid_core::context;
pub use xybrid_core::conversation::ConversationContext;
pub use xybrid_core::device::{
clear_battery_level, clear_thermal_state, set_battery_level, set_thermal_state, DeviceProfile,
MemoryPressure, ResourceMonitor, ResourceSnapshot, ResourceTelemetryMode, ResourceUsageSummary,
RunGuard as ResourceRunGuard, ThermalState,
};
pub use xybrid_core::execution;
pub use xybrid_core::features;
pub use xybrid_core::ir;
pub use xybrid_core::orchestrator;
pub use xybrid_core::orchestrator::routing_engine;
pub use xybrid_core::execution::{VoiceConfig, VoiceInfo};
pub use xybrid_core::runtime_adapter::types::{
GenerationConfig, PartialToken, StreamingCallback, StreamingError,
};
#[doc(hidden)]
pub use xybrid_core::execution as template_executor;
#[doc(hidden)]
pub use xybrid_core::execution::template as execution_template;
pub use benchmark::{compare_benchmarks, BenchmarkResult, ExecutionProviderInfo};
pub use cache::{CacheManager, CacheStatus, SdkCacheProvider};
pub use device::{device_id, Device};
pub use llm::{
default_gateway_url, set_gateway_url, ChatMessage, CompletionRequest, CompletionResponse,
LlmBackend, LlmClientConfig, MessageRole, TokenUsage,
};
pub use model::SdkError;
pub use model::{
ModelLoader, SdkResult, SeamInfo, StreamConfig, StreamEvent, StreamToken, XybridModel,
};
pub use platform::current_platform;
pub use registry_client::{CacheStats, ModelSummary, RegistryClient, ResolvedVariant};
pub use run_options::{AbortPolicy, AbortReason, AbortSignal, CancellationToken, RunOptions};
pub use xybrid_core::http::RetryableError;
pub use pipeline::{
AudioInputConfig,
AudioSampleFormat,
ConfigOutputType,
DownloadProgress,
FfiPipelineExecutionResult,
FfiStageExecutionResult,
InputConfig,
InputType,
Pipeline,
PipelineExecutionResult,
PipelineInputType,
PipelineRef,
PipelineSource,
StageInfo,
StageStatus,
StageTarget,
StageTiming as PipelineStageTiming, TextInputConfig,
Xybrid,
};
pub use result::{InferenceMetrics, InferenceResult, OutputType, StageLatency};
pub use source::ModelSource;
pub use stream::{PartialResult, StreamState, StreamStats, TranscriptionResult, XybridStream};
pub use streaming::{FfiPartialResult, FfiStreamState, FfiStreamStats, FfiStreamingConfig};
pub use telemetry::{
bridge_orchestrator_events,
convert_orchestrator_event,
flush_platform_telemetry,
init_platform_telemetry,
init_platform_telemetry_from_env,
publish_telemetry_event,
register_telemetry_sender,
set_telemetry_pipeline_context,
shutdown_platform_telemetry,
subscribe_orchestrator_events,
BridgeError,
BridgeHandle,
HttpTelemetryExporter,
OrchestratorEventBridge,
TelemetryConfig,
TelemetryEvent,
TelemetrySender,
};
pub use telemetry_optout::is_telemetry_opted_out;
pub use xybrid_core::event_bus::OrchestratorEvent;
pub use xybrid_core::execution::listener::{
clear_execution_listener, set_execution_listener, ExecutionEvent,
};
use std::sync::OnceLock;
static SDK_CONFIG: OnceLock<SdkConfig> = OnceLock::new();
static BINDING: OnceLock<&'static str> = OnceLock::new();
pub const DEFAULT_BINDING: &str = "rust";
pub const SDK_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn set_binding(binding: &'static str) {
let _ = BINDING.set(binding);
}
pub fn get_binding() -> &'static str {
BINDING.get().copied().unwrap_or(DEFAULT_BINDING)
}
#[derive(Debug, Clone, Default)]
pub struct SdkConfig {
pub cache_dir: Option<std::path::PathBuf>,
pub binding: Option<&'static str>,
}
impl SdkConfig {
pub fn with_binding(mut self, binding: &'static str) -> Self {
self.binding = Some(binding);
self
}
pub fn binding(&self) -> &'static str {
self.binding.unwrap_or(DEFAULT_BINDING)
}
}
pub fn init_sdk_cache_dir(cache_dir: impl Into<std::path::PathBuf>) {
let cache_path = cache_dir.into();
if let Some(cache_str) = cache_path.to_str() {
if std::env::var("HOME").is_err() {
if let Some(parent) = cache_path.parent() {
if let Some(parent_str) = parent.to_str() {
std::env::set_var("HOME", parent_str);
}
}
}
let hf_cache = cache_path.join("huggingface");
if let Some(hf_str) = hf_cache.to_str() {
std::env::set_var("HF_HOME", hf_str);
}
std::env::set_var("HF_HUB_OFFLINE", "1");
if std::env::var("XDG_CACHE_HOME").is_err() {
std::env::set_var("XDG_CACHE_HOME", cache_str);
}
}
let config = SdkConfig {
cache_dir: Some(cache_path),
..SdkConfig::default()
};
let _ = SDK_CONFIG.set(config);
}
pub fn get_sdk_cache_dir() -> Option<std::path::PathBuf> {
SDK_CONFIG.get().and_then(|c| c.cache_dir.clone())
}
pub fn is_sdk_cache_configured() -> bool {
SDK_CONFIG
.get()
.and_then(|c| c.cache_dir.as_ref())
.is_some()
}
pub fn set_api_key(api_key: &str) {
std::env::set_var("XYBRID_API_KEY", api_key);
}
pub fn set_provider_api_key(provider: &str, api_key: &str) {
let env_var = match provider.to_lowercase().as_str() {
"openai" => "OPENAI_API_KEY",
"anthropic" | "claude" => "ANTHROPIC_API_KEY",
"google" | "gemini" => "GOOGLE_API_KEY",
"openrouter" => "OPENROUTER_API_KEY",
"elevenlabs" => "ELEVENLABS_API_KEY",
_ => {
let custom_var = format!("{}_API_KEY", provider.to_uppercase());
std::env::set_var(&custom_var, api_key);
return;
}
};
std::env::set_var(env_var, api_key);
}
pub fn get_api_key() -> Option<String> {
std::env::var("XYBRID_API_KEY").ok()
}
pub fn has_api_key() -> bool {
std::env::var("XYBRID_API_KEY").is_ok()
}
pub fn init() -> XybridInit {
XybridInit::default()
}
#[derive(Debug, Clone, Default)]
#[must_use = "call .run() to apply the configuration"]
pub struct XybridInit {
api_key: Option<String>,
cache_dir: Option<std::path::PathBuf>,
gateway_url: Option<String>,
ingest_url: Option<String>,
resource_telemetry: Option<xybrid_core::device::ResourceTelemetryMode>,
binding: Option<&'static str>,
}
impl XybridInit {
pub fn api_key(mut self, key: impl Into<String>) -> Self {
self.api_key = Some(key.into());
self
}
pub fn cache_dir(mut self, dir: impl Into<std::path::PathBuf>) -> Self {
self.cache_dir = Some(dir.into());
self
}
pub fn gateway_url(mut self, url: impl Into<String>) -> Self {
self.gateway_url = Some(url.into());
self
}
pub fn ingest_url(mut self, url: impl Into<String>) -> Self {
self.ingest_url = Some(url.into());
self
}
pub fn resource_telemetry(mut self, mode: xybrid_core::device::ResourceTelemetryMode) -> Self {
self.resource_telemetry = Some(mode);
self
}
pub fn binding(mut self, binding: &'static str) -> Self {
self.binding = Some(binding);
self
}
pub fn run(self) {
if let Some(binding) = self.binding {
set_binding(binding);
}
if let Some(dir) = self.cache_dir {
init_sdk_cache_dir(dir);
}
if let Some(url) = self.gateway_url.as_deref() {
set_gateway_url(url);
}
if let Some(key) = self.api_key.as_deref() {
set_api_key(key);
}
if let Some(key) = self.api_key.as_deref() {
let endpoint = self
.ingest_url
.as_deref()
.unwrap_or(telemetry::DEFAULT_INGEST_URL);
let mut config = telemetry::TelemetryConfig::new(endpoint, key);
if let Some(mode) = self.resource_telemetry {
config = config.with_resource_telemetry(mode);
}
telemetry::init_platform_telemetry(config);
} else if self.ingest_url.is_some() {
log::warn!(
target: "xybrid_sdk",
"ingest_url set without api_key; telemetry exporter not started"
);
}
}
}
pub mod prelude {
pub use xybrid_core::context::{DeviceMetrics, StageDescriptor};
pub use xybrid_core::ir::{Envelope, EnvelopeKind};
pub use xybrid_core::orchestrator::routing_engine::{
LocalAvailability, RouteTarget, RoutingDecision,
};
pub use xybrid_core::orchestrator::{Orchestrator, OrchestratorError, StageExecutionResult};
}
pub struct EventStream {
receiver: mpsc::Receiver<OrchestratorEvent>,
}
impl EventStream {
pub async fn recv(&mut self) -> Option<OrchestratorEvent> {
self.receiver.recv().await
}
pub fn try_recv(&mut self) -> Result<OrchestratorEvent, mpsc::error::TryRecvError> {
self.receiver.try_recv()
}
}
pub fn subscribe_events(orchestrator: &Orchestrator) -> EventStream {
let (tx, rx) = mpsc::channel(100);
let event_bus = orchestrator.event_bus();
let subscription = event_bus.subscribe();
std::thread::spawn(move || {
loop {
match subscription.recv() {
Ok(event) => {
if tx.blocking_send(event).is_err() {
break;
}
}
Err(_) => break,
}
}
});
EventStream { receiver: rx }
}
pub mod hybrid {
pub use xybrid_macros::route;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct LegacyPipelineConfig {
#[serde(default)]
name: Option<String>,
stages: Vec<String>,
input: LegacyInputConfig,
#[serde(default)]
#[allow(dead_code)]
metrics: Option<serde_yaml::Value>,
availability: HashMap<String, bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct LegacyInputConfig {
kind: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct StageTiming {
pub name: String,
pub latency_ms: u32,
pub target: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct PipelineResult {
pub name: Option<String>,
pub stages: Vec<StageTiming>,
pub total_latency_ms: u32,
pub final_output: String,
}
#[derive(Debug, thiserror::Error)]
pub enum PipelineConfigError {
#[error("Failed to read config file: {0}")]
ConfigReadError(String),
#[error("Failed to parse YAML config: {0}")]
ConfigParseError(String),
#[error("Pipeline execution failed: {0}")]
ExecutionError(String),
}
pub fn run_pipeline(config_path: &str) -> Result<PipelineResult, PipelineConfigError> {
let config_content = fs::read_to_string(config_path)
.map_err(|e| PipelineConfigError::ConfigReadError(format!("{}: {}", config_path, e)))?;
let config: LegacyPipelineConfig = serde_yaml::from_str(&config_content)
.map_err(|e| PipelineConfigError::ConfigParseError(format!("{}: {}", config_path, e)))?;
if let Some(name) = &config.name {
log::info!(target: "xybrid_sdk", "Running pipeline: {}", name);
} else {
log::info!(target: "xybrid_sdk", "Running pipeline from: {}", config_path);
}
let stages: Vec<StageDescriptor> = config
.stages
.iter()
.map(|name| StageDescriptor::new(name.clone()))
.collect();
log::debug!(target: "xybrid_sdk", "Pipeline has {} stages", stages.len());
let kind = match config.input.kind.as_str() {
"Audio" | "audio" => EnvelopeKind::Audio(vec![]),
"Text" | "text" => EnvelopeKind::Text(String::new()),
"Embedding" | "embedding" => EnvelopeKind::Embedding(vec![]),
_ => EnvelopeKind::Text(config.input.kind.clone()),
};
let input = Envelope::new(kind);
let metrics = DeviceMetrics::default();
if !config.availability.is_empty() {
log::warn!(
target: "xybrid_sdk",
"Legacy pipeline availability hints are ignored; use PipelineRef::load() and load_models() for local execution"
);
}
let availability_fn =
move |_stage: &str| -> LocalAvailability { LocalAvailability::new(false) };
let mut orchestrator = Orchestrator::new();
let orchestrator_bridge = telemetry::subscribe_orchestrator_events(&orchestrator);
let start_time = std::time::Instant::now();
let results: Vec<StageExecutionResult> = orchestrator
.execute_pipeline(&stages, &input, &metrics, &availability_fn)
.map_err(|e| {
orchestrator_bridge.drain();
PipelineConfigError::ExecutionError(format!("{}", e))
})?;
orchestrator_bridge.drain();
let total_latency_ms = start_time.elapsed().as_millis() as u32;
let stage_timings: Vec<StageTiming> = results
.iter()
.map(|result| StageTiming {
name: result.stage.clone(),
latency_ms: result.latency_ms,
target: result.routing_decision.target.to_string(),
})
.collect();
let final_output = results
.last()
.map(|r| r.output.kind_str().to_string())
.unwrap_or_else(|| "unknown".to_string());
log::info!(target: "xybrid_sdk", "Pipeline completed in {}ms", total_latency_ms);
Ok(PipelineResult {
name: config.name.clone(),
stages: stage_timings,
total_latency_ms,
final_output,
})
}
pub async fn run_pipeline_async(config_path: &str) -> Result<PipelineResult, PipelineConfigError> {
let config_content = tokio::fs::read_to_string(config_path)
.await
.map_err(|e| PipelineConfigError::ConfigReadError(format!("{}: {}", config_path, e)))?;
let config: LegacyPipelineConfig = serde_yaml::from_str(&config_content)
.map_err(|e| PipelineConfigError::ConfigParseError(format!("{}: {}", config_path, e)))?;
if let Some(name) = &config.name {
log::info!(target: "xybrid_sdk", "Running pipeline (async): {}", name);
} else {
log::info!(target: "xybrid_sdk", "Running pipeline (async) from: {}", config_path);
}
let stages: Vec<StageDescriptor> = config
.stages
.iter()
.map(|name| StageDescriptor::new(name.clone()))
.collect();
log::debug!(target: "xybrid_sdk", "Pipeline has {} stages", stages.len());
let kind = match config.input.kind.as_str() {
"Audio" | "audio" => EnvelopeKind::Audio(vec![]),
"Text" | "text" => EnvelopeKind::Text(String::new()),
"Embedding" | "embedding" => EnvelopeKind::Embedding(vec![]),
_ => EnvelopeKind::Text(config.input.kind.clone()),
};
let input = Envelope::new(kind);
let metrics = DeviceMetrics::default();
if !config.availability.is_empty() {
log::warn!(
target: "xybrid_sdk",
"Legacy pipeline availability hints are ignored; use PipelineRef::load() and load_models() for local execution"
);
}
let availability_fn =
move |_stage: &str| -> LocalAvailability { LocalAvailability::new(false) };
let mut orchestrator = Orchestrator::new();
let orchestrator_bridge = telemetry::subscribe_orchestrator_events(&orchestrator);
let start_time = std::time::Instant::now();
let results: Vec<StageExecutionResult> = orchestrator
.execute_pipeline_async(&stages, &input, &metrics, &availability_fn)
.await
.map_err(|e| {
orchestrator_bridge.drain();
PipelineConfigError::ExecutionError(format!("{}", e))
})?;
orchestrator_bridge.drain();
let total_latency_ms = start_time.elapsed().as_millis() as u32;
let stage_timings: Vec<StageTiming> = results
.iter()
.map(|result| StageTiming {
name: result.stage.clone(),
latency_ms: result.latency_ms,
target: result.routing_decision.target.to_string(),
})
.collect();
let final_output = results
.last()
.map(|r| r.output.kind_str().to_string())
.unwrap_or_else(|| "unknown".to_string());
log::info!(target: "xybrid_sdk", "Pipeline completed in {}ms", total_latency_ms);
Ok(PipelineResult {
name: config.name.clone(),
stages: stage_timings,
total_latency_ms,
final_output,
})
}
#[cfg(test)]
mod sdk_config_tests {
use super::{SdkConfig, DEFAULT_BINDING};
#[test]
fn default_binding_is_rust() {
assert_eq!(DEFAULT_BINDING, "rust");
}
#[test]
fn default_config_resolves_to_default_binding() {
let cfg = SdkConfig::default();
assert!(cfg.binding.is_none());
assert_eq!(cfg.binding(), "rust");
}
#[test]
fn with_binding_overrides_default() {
let cfg = SdkConfig::default().with_binding("flutter");
assert_eq!(cfg.binding, Some("flutter"));
assert_eq!(cfg.binding(), "flutter");
}
#[test]
fn with_binding_preserves_other_fields() {
let cfg = SdkConfig {
cache_dir: Some(std::path::PathBuf::from("/tmp/xybrid-cache")),
..SdkConfig::default()
}
.with_binding("kotlin");
assert_eq!(cfg.binding(), "kotlin");
assert_eq!(
cfg.cache_dir.as_deref(),
Some(std::path::Path::new("/tmp/xybrid-cache"))
);
}
}
#[cfg(test)]
mod xybrid_init_tests {
use super::{init, XybridInit};
use xybrid_core::device::ResourceTelemetryMode;
#[test]
fn init_returns_default_anonymous_builder() {
let builder = init();
let default = XybridInit::default();
assert_eq!(format!("{:?}", builder), format!("{:?}", default));
}
#[test]
fn api_key_setter_stores_value() {
let builder = init().api_key("xy_test_123");
let dbg = format!("{:?}", builder);
assert!(dbg.contains("xy_test_123"), "debug = {}", dbg);
}
#[test]
fn cache_dir_setter_stores_path() {
let builder = init().cache_dir("/tmp/xybrid-test-cache");
let dbg = format!("{:?}", builder);
assert!(
dbg.contains("xybrid-test-cache"),
"cache_dir not stored, debug = {}",
dbg
);
}
#[test]
fn ingest_url_setter_stores_value() {
let builder = init().ingest_url("http://192.168.1.78:8081");
let dbg = format!("{:?}", builder);
assert!(dbg.contains("192.168.1.78"), "debug = {}", dbg);
}
#[test]
fn gateway_url_setter_stores_value() {
let builder = init().gateway_url("https://gateway.example/v1");
let dbg = format!("{:?}", builder);
assert!(dbg.contains("gateway.example"), "debug = {}", dbg);
}
#[test]
fn resource_telemetry_setter_stores_mode() {
let builder = init().resource_telemetry(ResourceTelemetryMode::Boundary);
let dbg = format!("{:?}", builder);
assert!(dbg.contains("Boundary"), "debug = {}", dbg);
}
#[test]
fn builder_is_chainable() {
let _builder = init()
.api_key("xy_test")
.cache_dir("/tmp/x")
.gateway_url("https://gateway")
.ingest_url("https://ingest")
.resource_telemetry(ResourceTelemetryMode::Off)
.binding("rust");
}
}