use std::env;
use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use xybrid_core::device::{MemoryPressure, ResourceSnapshot};
use xybrid_core::ir::{Envelope, EnvelopeKind};
use xybrid_core::orchestrator::authority::test_seams::StagedResourceProvider;
use xybrid_core::runtime_adapter::types::PartialToken;
use xybrid_core::runtime_adapter::CloudRuntimeAdapter;
use xybrid_sdk::run_options::{AbortPolicy, AbortSignal, RunOptions};
use xybrid_sdk::telemetry::{flush_platform_telemetry, init_platform_telemetry_from_env};
use xybrid_sdk::{ModelLoader, SeamInfo};
const PROMPT: &str = "Write a short story about a lighthouse keeper who learns to brew tea.";
const LOCAL_MODEL_ID: &str = "qwen2.5-0.5b-instruct";
const NORMAL_READS_BEFORE_PRESSURE: usize = 3;
fn critical_snapshot() -> ResourceSnapshot {
let mut snap = ResourceSnapshot::unknown();
snap.memory_pressure = MemoryPressure::Critical;
snap
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let total_start = Instant::now();
println!("═══════════════════════════════════════════════════════════════════");
println!(" Xybrid Cloud Fallback Demo");
println!("═══════════════════════════════════════════════════════════════════");
println!();
println!(" Prompt: {}", PROMPT);
println!();
if init_platform_telemetry_from_env() {
println!(" Telemetry: publisher initialized");
if let Ok(url) = env::var("XYBRID_INGEST_URL") {
println!(" ingest endpoint: {}", url);
} else {
println!(" ingest endpoint: https://ingest.xybrid.dev (default)");
}
} else {
eprintln!(
" ⚠ Telemetry: XYBRID_API_KEY not set — events will be dropped, no trace will \
appear in the console. Mint a key in the platform console and export it."
);
}
println!();
let load_start = Instant::now();
let model = ModelLoader::from_registry(LOCAL_MODEL_ID).load()?;
println!(
" Loaded {} (local) in {:?}",
LOCAL_MODEL_ID,
load_start.elapsed()
);
let provider = Arc::new(StagedResourceProvider::new(
NORMAL_READS_BEFORE_PRESSURE,
critical_snapshot(),
));
let cloud_url =
env::var("XYBRID_CLOUD_URL").unwrap_or_else(|_| "http://localhost:3001/v1".to_string());
let cloud_adapter = CloudRuntimeAdapter::with_gateway(&cloud_url);
let options = RunOptions::new()
.with_abort_policy(
AbortPolicy::default()
.stop_on(AbortSignal::MemoryPressureCritical)
.with_cloud_fallback(true)
.with_max_grace_tokens(0),
)
.with_resource_provider(provider);
let mut envelope = Envelope::new(EnvelopeKind::Text(PROMPT.to_string()));
envelope
.metadata
.insert("provider".to_string(), "deepseek".to_string());
envelope
.metadata
.insert("model".to_string(), "deepseek-chat".to_string());
envelope
.metadata
.insert("max_tokens".to_string(), "200".to_string());
envelope.metadata.insert(
"system_prompt".to_string(),
"You are a concise storyteller.".to_string(),
);
let on_cloud = Arc::new(AtomicBool::new(false));
let local_tokens = Arc::new(Mutex::new(0u32));
let cloud_tokens = Arc::new(Mutex::new(0u32));
let captured_seam: Arc<Mutex<Option<SeamInfo>>> = Arc::new(Mutex::new(None));
let on_cloud_for_token = on_cloud.clone();
let local_tokens_for_token = local_tokens.clone();
let cloud_tokens_for_token = cloud_tokens.clone();
let mut on_token =
move |token: PartialToken| -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if on_cloud_for_token.load(Ordering::SeqCst) {
let mut count = cloud_tokens_for_token.lock().unwrap();
if *count == 0 {
print!("[cloud] ");
}
*count += 1;
} else {
let mut count = local_tokens_for_token.lock().unwrap();
if *count == 0 {
print!("[device] ");
}
*count += 1;
}
print!("{}", token.token);
std::io::stdout().flush().ok();
Ok(())
};
let on_cloud_for_seam = on_cloud.clone();
let captured_seam_for_seam = captured_seam.clone();
let mut on_seam = move |info: SeamInfo| {
println!();
println!();
println!(
" ↘ cloud fallback ({}) — local leg: {} tokens / {} ms",
info.reason, info.local_tokens, info.local_latency_ms
);
println!();
on_cloud_for_seam.store(true, Ordering::SeqCst);
*captured_seam_for_seam.lock().unwrap() = Some(info);
};
let run_start = Instant::now();
let result = model.run_streaming_with_fallback(
&envelope,
&options,
&cloud_adapter,
&mut on_token,
&mut on_seam,
)?;
let run_latency_ms = run_start.elapsed().as_millis() as u32;
println!();
println!();
println!("═══════════════════════════════════════════════════════════════════");
println!(" Summary");
println!("═══════════════════════════════════════════════════════════════════");
let local = *local_tokens.lock().unwrap();
let cloud = *cloud_tokens.lock().unwrap();
if let Some(info) = captured_seam.lock().unwrap().clone() {
let cloud_ms = run_latency_ms.saturating_sub(info.local_latency_ms);
println!(" correlation_id : {}", info.correlation_id);
println!(" abort reason : {}", info.reason);
println!(" target chain : device → cloud");
println!(
" local leg : {} tokens / {} ms / aborted",
local, info.local_latency_ms
);
println!(
" cloud leg : {} tokens / ~{} ms / completed",
cloud, cloud_ms
);
} else {
println!(
" No fallback fired — the local leg completed in {} ms with {} tokens.",
run_latency_ms, local
);
}
println!(" total wallclock: {:?}", total_start.elapsed());
println!(
" result text : {}",
result.text().unwrap_or("(non-text)")
);
println!();
println!(" Flushing telemetry (channel drain + buffer flush)…");
std::thread::sleep(std::time::Duration::from_millis(200));
flush_platform_telemetry();
std::thread::sleep(std::time::Duration::from_millis(200));
Ok(())
}