use crate::download::manager::{DownloadProgress, DownloadProgressCallback};
use crate::runtime::entry::RuntimeEntryDescriptor;
use crate::skill::manager::{SkillLifecycleAction, SkillManagementAuthority, SkillOperationPlane};
use crate::skill::source::SkillInstallSourceType;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{SystemTime, UNIX_EPOCH};
pub type RuntimeSkillLifecycleCallback = Arc<dyn Fn(&RuntimeSkillLifecycleEvent) + Send + Sync>;
pub type RuntimeSkillOperationProgressCallback =
Arc<dyn Fn(&RuntimeSkillOperationProgressEvent) + Send + Sync>;
pub type RuntimeEntryRegistryCallback = Arc<dyn Fn(&RuntimeEntryRegistryDelta) + Send + Sync>;
pub type RuntimeSkillManagementCallback =
Arc<dyn Fn(&RuntimeSkillManagementRequest) -> Result<Value, String> + Send + Sync>;
pub type RuntimeHostToolCallback =
Arc<dyn Fn(&RuntimeHostToolRequest) -> Result<Value, String> + Send + Sync>;
pub type RuntimeModelEmbedCallback = Arc<
dyn Fn(&RuntimeModelEmbedRequest) -> Result<RuntimeModelEmbedResponse, RuntimeModelError>
+ Send
+ Sync,
>;
pub type RuntimeModelLlmCallback = Arc<
dyn Fn(&RuntimeModelLlmRequest) -> Result<RuntimeModelLlmResponse, RuntimeModelError>
+ Send
+ Sync,
>;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum RuntimeHostToolAction {
List,
Has,
Call,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RuntimeHostToolRequest {
pub action: RuntimeHostToolAction,
pub tool_name: Option<String>,
pub args: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RuntimeModelCaller {
pub skill_id: Option<String>,
pub entry_name: Option<String>,
pub canonical_tool_name: Option<String>,
pub root_name: Option<String>,
pub skill_dir: Option<String>,
pub client_name: Option<String>,
pub request_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RuntimeModelEmbedRequest {
pub text: String,
pub caller: RuntimeModelCaller,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RuntimeModelEmbedResponse {
pub vector: Vec<f32>,
pub dimensions: usize,
pub usage: Option<RuntimeModelUsage>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RuntimeModelLlmRequest {
pub system: String,
pub user: String,
pub caller: RuntimeModelCaller,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RuntimeModelLlmResponse {
pub assistant: String,
pub usage: Option<RuntimeModelUsage>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RuntimeModelUsage {
pub input_tokens: Option<u64>,
pub output_tokens: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RuntimeModelError {
pub code: RuntimeModelErrorCode,
pub message: String,
pub provider_message: Option<String>,
pub provider_code: Option<String>,
pub provider_status: Option<u16>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum RuntimeModelErrorCode {
ModelUnavailable,
InvalidArgument,
ProviderError,
Timeout,
BudgetExceeded,
InternalError,
}
impl RuntimeModelErrorCode {
pub fn as_str(&self) -> &'static str {
match self {
Self::ModelUnavailable => "model_unavailable",
Self::InvalidArgument => "invalid_argument",
Self::ProviderError => "provider_error",
Self::Timeout => "timeout",
Self::BudgetExceeded => "budget_exceeded",
Self::InternalError => "internal_error",
}
}
pub fn from_code_str(value: &str) -> Self {
match value {
"model_unavailable" => Self::ModelUnavailable,
"invalid_argument" => Self::InvalidArgument,
"provider_error" => Self::ProviderError,
"timeout" => Self::Timeout,
"budget_exceeded" => Self::BudgetExceeded,
_ => Self::InternalError,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum RuntimeSkillManagementAction {
Install,
Update,
Uninstall,
Enable,
Disable,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RuntimeSkillManagementRequest {
pub action: RuntimeSkillManagementAction,
pub authority: SkillManagementAuthority,
pub input: Value,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct RuntimeSkillLifecycleEvent {
pub plane: SkillOperationPlane,
pub action: SkillLifecycleAction,
pub skill_id: String,
pub root_name: Option<String>,
pub skill_dir: Option<String>,
pub status: String,
pub message: Option<String>,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
pub struct RuntimeSkillOperationProgressEvent {
pub operation_id: String,
pub sequence: u64,
pub plane: SkillOperationPlane,
pub action: SkillLifecycleAction,
pub phase: String,
pub status: String,
pub skill_id: Option<String>,
pub root_name: Option<String>,
pub source_type: Option<SkillInstallSourceType>,
pub source_locator: Option<String>,
pub bytes_done: Option<u64>,
pub bytes_total: Option<u64>,
pub percent: Option<f64>,
pub message: Option<String>,
}
#[derive(Clone)]
pub(crate) struct RuntimeSkillOperationProgressEmitter {
operation_id: String,
sequence: Arc<AtomicU64>,
plane: SkillOperationPlane,
action: SkillLifecycleAction,
root_name: Option<String>,
skill_id: Option<String>,
}
impl RuntimeSkillOperationProgressEmitter {
pub(crate) fn new(
plane: SkillOperationPlane,
action: SkillLifecycleAction,
root_name: Option<String>,
skill_id: Option<String>,
) -> Self {
Self {
operation_id: build_skill_operation_id(action, skill_id.as_deref()),
sequence: Arc::new(AtomicU64::new(0)),
plane,
action,
root_name,
skill_id,
}
}
pub(crate) fn emit(&self, phase: &str, status: &str, message: Option<String>) {
self.emit_detail(RuntimeSkillOperationProgressDetail {
phase,
status,
skill_id: None,
source_type: None,
source_locator: None,
bytes_done: None,
bytes_total: None,
message,
});
}
pub(crate) fn emit_detail(&self, detail: RuntimeSkillOperationProgressDetail<'_>) {
let bytes_total = detail.bytes_total;
let percent = match (detail.bytes_done, bytes_total) {
(Some(done), Some(total)) if total > 0 => {
Some(((done as f64 / total as f64) * 100.0).min(100.0))
}
_ => None,
};
emit_skill_operation_progress_event(&RuntimeSkillOperationProgressEvent {
operation_id: self.operation_id.clone(),
sequence: self.sequence.fetch_add(1, Ordering::SeqCst) + 1,
plane: self.plane,
action: self.action,
phase: detail.phase.to_string(),
status: detail.status.to_string(),
skill_id: detail
.skill_id
.map(ToOwned::to_owned)
.or_else(|| self.skill_id.clone()),
root_name: self.root_name.clone(),
source_type: detail.source_type,
source_locator: detail.source_locator.map(ToOwned::to_owned),
bytes_done: detail.bytes_done,
bytes_total,
percent,
message: detail.message,
});
}
pub(crate) fn download_callback(
&self,
source_type: SkillInstallSourceType,
skill_id: String,
) -> DownloadProgressCallback {
let emitter = self.clone();
Arc::new(move |progress: &DownloadProgress| {
emitter.emit_detail(RuntimeSkillOperationProgressDetail {
phase: "downloading_archive",
status: if progress.cached {
"cached"
} else {
"progress"
},
skill_id: Some(skill_id.as_str()),
source_type: Some(source_type),
source_locator: Some(progress.source_locator.as_str()),
bytes_done: Some(progress.bytes_done),
bytes_total: progress.bytes_total,
message: if progress.cached {
Some("download cache hit".to_string())
} else {
None
},
});
})
}
}
pub(crate) struct RuntimeSkillOperationProgressDetail<'a> {
pub phase: &'a str,
pub status: &'a str,
pub skill_id: Option<&'a str>,
pub source_type: Option<SkillInstallSourceType>,
pub source_locator: Option<&'a str>,
pub bytes_done: Option<u64>,
pub bytes_total: Option<u64>,
pub message: Option<String>,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct RuntimeEntryRegistryDelta {
pub added_entries: Vec<RuntimeEntryDescriptor>,
pub removed_entry_names: Vec<String>,
pub updated_entries: Vec<RuntimeEntryDescriptor>,
}
pub fn set_skill_lifecycle_callback(callback: Option<RuntimeSkillLifecycleCallback>) {
let registry = skill_lifecycle_callback_registry();
let mut guard = registry.lock().unwrap();
*guard = callback;
}
pub fn set_skill_operation_progress_callback(
callback: Option<RuntimeSkillOperationProgressCallback>,
) {
let registry = skill_operation_progress_callback_registry();
let mut guard = registry.lock().unwrap();
*guard = callback;
}
pub fn set_entry_registry_callback(callback: Option<RuntimeEntryRegistryCallback>) {
let registry = entry_registry_callback_registry();
let mut guard = registry.lock().unwrap();
*guard = callback;
}
pub fn set_skill_management_callback(callback: Option<RuntimeSkillManagementCallback>) {
let registry = skill_management_callback_registry();
let mut guard = registry.lock().unwrap();
*guard = callback;
}
pub fn set_host_tool_callback(callback: Option<RuntimeHostToolCallback>) {
let registry = host_tool_callback_registry();
let mut guard = registry.lock().unwrap();
*guard = callback;
}
pub fn set_model_embed_callback(callback: Option<RuntimeModelEmbedCallback>) {
let registry = model_embed_callback_registry();
let mut guard = registry.lock().unwrap();
*guard = callback;
}
pub fn set_model_llm_callback(callback: Option<RuntimeModelLlmCallback>) {
let registry = model_llm_callback_registry();
let mut guard = registry.lock().unwrap();
*guard = callback;
}
#[cfg(test)]
pub(crate) struct RuntimeModelCallbackTestGuard {
_guard: std::sync::MutexGuard<'static, ()>,
}
#[cfg(test)]
impl Drop for RuntimeModelCallbackTestGuard {
fn drop(&mut self) {
set_model_embed_callback(None);
set_model_llm_callback(None);
}
}
#[cfg(test)]
pub(crate) fn runtime_model_callback_test_guard() -> RuntimeModelCallbackTestGuard {
static GUARD: OnceLock<Mutex<()>> = OnceLock::new();
let guard = GUARD
.get_or_init(|| Mutex::new(()))
.lock()
.expect("lock model callback test guard");
set_model_embed_callback(None);
set_model_llm_callback(None);
RuntimeModelCallbackTestGuard { _guard: guard }
}
pub(crate) fn emit_skill_lifecycle_event(event: &RuntimeSkillLifecycleEvent) {
let registry = skill_lifecycle_callback_registry();
let callback = {
let guard = registry.lock().unwrap();
guard.clone()
};
if let Some(callback) = callback {
callback(event);
}
}
pub(crate) fn emit_skill_operation_progress_event(event: &RuntimeSkillOperationProgressEvent) {
let registry = skill_operation_progress_callback_registry();
let callback = {
let guard = registry.lock().unwrap();
guard.clone()
};
if let Some(callback) = callback {
callback(event);
}
}
pub(crate) fn emit_entry_registry_delta(delta: &RuntimeEntryRegistryDelta) {
let registry = entry_registry_callback_registry();
let callback = {
let guard = registry.lock().unwrap();
guard.clone()
};
if let Some(callback) = callback {
callback(delta);
}
}
pub(crate) fn dispatch_skill_management_request(
request: &RuntimeSkillManagementRequest,
) -> Result<Value, String> {
let registry = skill_management_callback_registry();
let callback = {
let guard = registry
.lock()
.map_err(|_| "Skill management callback registry lock poisoned".to_string())?;
guard.clone()
};
let callback = callback.ok_or_else(|| {
"Runtime skill management bridge is enabled but no host callback is registered".to_string()
})?;
callback(request)
}
pub(crate) fn dispatch_host_tool_request(
request: &RuntimeHostToolRequest,
) -> Result<Value, String> {
let registry = host_tool_callback_registry();
let callback = {
let guard = registry
.lock()
.map_err(|_| "Host tool callback registry lock poisoned".to_string())?;
guard.clone()
};
let callback = callback.ok_or_else(|| {
"Host tool bridge is enabled but no host callback is registered".to_string()
})?;
callback(request)
}
fn model_internal_error(message: impl Into<String>) -> RuntimeModelError {
RuntimeModelError {
code: RuntimeModelErrorCode::InternalError,
message: message.into(),
provider_message: None,
provider_code: None,
provider_status: None,
}
}
fn model_unavailable_error(message: impl Into<String>) -> RuntimeModelError {
RuntimeModelError {
code: RuntimeModelErrorCode::ModelUnavailable,
message: message.into(),
provider_message: None,
provider_code: None,
provider_status: None,
}
}
pub(crate) fn dispatch_model_embed_request(
request: &RuntimeModelEmbedRequest,
) -> Result<RuntimeModelEmbedResponse, RuntimeModelError> {
let registry = model_embed_callback_registry();
let callback = {
let guard = registry
.lock()
.map_err(|_| model_internal_error("Model embed callback registry lock poisoned"))?;
guard.clone()
};
let callback =
callback.ok_or_else(|| model_unavailable_error("embedding callback is not registered"))?;
callback(request)
}
pub(crate) fn dispatch_model_llm_request(
request: &RuntimeModelLlmRequest,
) -> Result<RuntimeModelLlmResponse, RuntimeModelError> {
let registry = model_llm_callback_registry();
let callback = {
let guard = registry
.lock()
.map_err(|_| model_internal_error("Model llm callback registry lock poisoned"))?;
guard.clone()
};
let callback =
callback.ok_or_else(|| model_unavailable_error("llm callback is not registered"))?;
callback(request)
}
pub(crate) fn try_has_skill_management_callback() -> Result<bool, String> {
let registry = skill_management_callback_registry();
let guard = registry
.lock()
.map_err(|_| "Skill management callback registry lock poisoned".to_string())?;
Ok(guard.is_some())
}
pub(crate) fn try_has_host_tool_callback() -> Result<bool, String> {
let registry = host_tool_callback_registry();
let guard = registry
.lock()
.map_err(|_| "Host tool callback registry lock poisoned".to_string())?;
Ok(guard.is_some())
}
pub(crate) fn try_has_model_embed_callback() -> Result<bool, String> {
let registry = model_embed_callback_registry();
let guard = registry
.lock()
.map_err(|_| "Model embed callback registry lock poisoned".to_string())?;
Ok(guard.is_some())
}
pub(crate) fn try_has_model_llm_callback() -> Result<bool, String> {
let registry = model_llm_callback_registry();
let guard = registry
.lock()
.map_err(|_| "Model llm callback registry lock poisoned".to_string())?;
Ok(guard.is_some())
}
fn build_skill_operation_id(action: SkillLifecycleAction, skill_id: Option<&str>) -> String {
let action_name = match action {
SkillLifecycleAction::Install => "install",
SkillLifecycleAction::Update => "update",
SkillLifecycleAction::Reload => "reload",
SkillLifecycleAction::Uninstall => "uninstall",
SkillLifecycleAction::Enable => "enable",
SkillLifecycleAction::Disable => "disable",
};
let skill_fragment = skill_id
.map(|value| {
value
.chars()
.map(|ch| match ch {
'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
_ => '-',
})
.collect::<String>()
})
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "unknown".to_string());
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
format!("skill-{}-{}-{}", action_name, skill_fragment, timestamp)
}
fn skill_lifecycle_callback_registry() -> &'static Mutex<Option<RuntimeSkillLifecycleCallback>> {
static REGISTRY: OnceLock<Mutex<Option<RuntimeSkillLifecycleCallback>>> = OnceLock::new();
REGISTRY.get_or_init(|| Mutex::new(None))
}
fn skill_operation_progress_callback_registry()
-> &'static Mutex<Option<RuntimeSkillOperationProgressCallback>> {
static REGISTRY: OnceLock<Mutex<Option<RuntimeSkillOperationProgressCallback>>> =
OnceLock::new();
REGISTRY.get_or_init(|| Mutex::new(None))
}
fn entry_registry_callback_registry() -> &'static Mutex<Option<RuntimeEntryRegistryCallback>> {
static REGISTRY: OnceLock<Mutex<Option<RuntimeEntryRegistryCallback>>> = OnceLock::new();
REGISTRY.get_or_init(|| Mutex::new(None))
}
fn skill_management_callback_registry() -> &'static Mutex<Option<RuntimeSkillManagementCallback>> {
static REGISTRY: OnceLock<Mutex<Option<RuntimeSkillManagementCallback>>> = OnceLock::new();
REGISTRY.get_or_init(|| Mutex::new(None))
}
fn host_tool_callback_registry() -> &'static Mutex<Option<RuntimeHostToolCallback>> {
static REGISTRY: OnceLock<Mutex<Option<RuntimeHostToolCallback>>> = OnceLock::new();
REGISTRY.get_or_init(|| Mutex::new(None))
}
fn model_embed_callback_registry() -> &'static Mutex<Option<RuntimeModelEmbedCallback>> {
static REGISTRY: OnceLock<Mutex<Option<RuntimeModelEmbedCallback>>> = OnceLock::new();
REGISTRY.get_or_init(|| Mutex::new(None))
}
fn model_llm_callback_registry() -> &'static Mutex<Option<RuntimeModelLlmCallback>> {
static REGISTRY: OnceLock<Mutex<Option<RuntimeModelLlmCallback>>> = OnceLock::new();
REGISTRY.get_or_init(|| Mutex::new(None))
}
#[cfg(test)]
mod tests {
use super::{
RuntimeSkillOperationProgressCallback, RuntimeSkillOperationProgressDetail,
RuntimeSkillOperationProgressEmitter, RuntimeSkillOperationProgressEvent,
set_skill_operation_progress_callback,
};
use crate::skill::manager::{SkillLifecycleAction, SkillOperationPlane};
use crate::skill::source::SkillInstallSourceType;
use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
fn progress_callback_test_guard() -> MutexGuard<'static, ()> {
static TEST_MUTEX: OnceLock<Mutex<()>> = OnceLock::new();
match TEST_MUTEX.get_or_init(|| Mutex::new(())).lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
}
#[test]
fn progress_emitter_reports_sequence_and_percent() {
let _guard = progress_callback_test_guard();
let captured = Arc::new(Mutex::new(Vec::<RuntimeSkillOperationProgressEvent>::new()));
let callback_events = captured.clone();
let callback: RuntimeSkillOperationProgressCallback =
Arc::new(move |event: &RuntimeSkillOperationProgressEvent| {
callback_events
.lock()
.expect("capture progress events")
.push(event.clone());
});
set_skill_operation_progress_callback(Some(callback));
let emitter = RuntimeSkillOperationProgressEmitter::new(
SkillOperationPlane::System,
SkillLifecycleAction::Install,
Some("ROOT".to_string()),
Some("demo-skill".to_string()),
);
emitter.emit_detail(RuntimeSkillOperationProgressDetail {
phase: "downloading_archive",
status: "progress",
skill_id: Some("demo-skill"),
source_type: Some(SkillInstallSourceType::OfficialHub),
source_locator: Some("https://hub.example.invalid/demo.zip"),
bytes_done: Some(5),
bytes_total: Some(10),
message: None,
});
emitter.emit("completed", "completed", Some("done".to_string()));
set_skill_operation_progress_callback(None);
let events = captured.lock().expect("read captured progress events");
let operation_id = events
.iter()
.find(|event| {
event.skill_id.as_deref() == Some("demo-skill")
&& event.phase == "downloading_archive"
})
.expect("download progress event should be captured")
.operation_id
.clone();
let operation_events = events
.iter()
.filter(|event| event.operation_id == operation_id)
.collect::<Vec<_>>();
assert_eq!(operation_events.len(), 2);
assert_eq!(operation_events[0].sequence, 1);
assert_eq!(operation_events[1].sequence, 2);
assert_eq!(operation_events[0].percent, Some(50.0));
assert_eq!(
operation_events[0].source_type,
Some(SkillInstallSourceType::OfficialHub)
);
}
}