use crate::host::database::RuntimeDatabaseBindingContext;
use crate::host::options::{LuaRuntimeHostOptions, LuaRuntimeSpaceControllerProcessMode};
use sha2::{Digest, Sha256};
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::runtime::{Handle, Runtime};
use vldb_controller_client::{
BoxError, ClientRegistration, ControllerClient, ControllerClientConfig, ControllerProcessMode,
SpaceKind, SpaceRegistration,
};
pub struct LuaRuntimeSpaceControllerBridge {
client: ControllerClient,
runtime: Mutex<Runtime>,
binding_scope_id: String,
}
impl LuaRuntimeSpaceControllerBridge {
pub fn new(
host_options: &LuaRuntimeHostOptions,
backend_suffix: &str,
) -> Result<Arc<Self>, String> {
let controller_options = &host_options.space_controller;
let endpoint = controller_options
.endpoint
.clone()
.unwrap_or_else(|| "http://127.0.0.1:19801".to_string());
let process_id = std::process::id();
let started_at_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis())
.unwrap_or_default();
let registration = ClientRegistration {
client_name: format!(
"vulcan-luaskills-{}-{}-{}",
process_id, backend_suffix, started_at_ms
),
host_kind: "vulcan_luaskills".to_string(),
process_id,
process_name: backend_suffix.to_string(),
lease_ttl_secs: Some(controller_options.default_lease_ttl_secs),
};
let config = ControllerClientConfig {
endpoint,
auto_spawn: controller_options.auto_spawn,
spawn_executable: controller_options
.executable_path
.as_ref()
.map(|path| path.to_string_lossy().to_string()),
spawn_process_mode: map_process_mode(controller_options.process_mode),
minimum_uptime_secs: controller_options.minimum_uptime_secs,
idle_timeout_secs: controller_options.idle_timeout_secs,
default_lease_ttl_secs: controller_options.default_lease_ttl_secs,
connect_timeout_secs: controller_options.connect_timeout_secs,
startup_timeout_secs: controller_options.startup_timeout_secs,
startup_retry_interval_ms: controller_options.startup_retry_interval_ms,
lease_renew_interval_secs: controller_options.lease_renew_interval_secs,
};
let runtime = Runtime::new()
.map_err(|error| format!("failed to create controller tokio runtime: {}", error))?;
let client = ControllerClient::new(config, registration);
run_controller_operation_with_client(&runtime, &client, |client| async move {
client.connect().await
})
.map_err(|error| format!("failed to connect space controller client: {}", error))?;
let binding_scope_id =
resolve_controller_binding_scope_id(&runtime, &client).map_err(|error| {
format!(
"failed to resolve space controller session scope: {}",
error
)
})?;
Ok(Arc::new(Self {
client,
runtime: Mutex::new(runtime),
binding_scope_id,
}))
}
pub fn run<F, Fut, T>(&self, operation: F) -> Result<T, String>
where
F: FnOnce(ControllerClient) -> Fut + Send + 'static,
Fut: Future<Output = Result<T, BoxError>> + Send + 'static,
T: Send + 'static,
{
let runtime = self
.runtime
.lock()
.map_err(|_| "controller runtime lock poisoned".to_string())?;
run_controller_operation_with_client(&runtime, &self.client, operation)
.map_err(|error| format!("space controller request failed: {}", error))
}
pub fn attach_binding(&self, binding: &RuntimeDatabaseBindingContext) -> Result<(), String> {
let registration = SpaceRegistration {
space_id: controller_space_id_for_binding(binding),
space_label: binding.space_label.clone(),
space_kind: map_space_kind(&binding.space_label),
space_root: binding.space_root.clone(),
};
self.run(move |client| async move { client.attach_space(registration).await })
.map(|_| ())
}
pub fn controller_binding_id_for_binding(
&self,
binding: &RuntimeDatabaseBindingContext,
) -> String {
build_controller_binding_id(binding.binding_tag.as_str(), self.binding_scope_id.as_str())
}
}
fn run_controller_operation_with_client<F, Fut, T>(
runtime: &Runtime,
client: &ControllerClient,
operation: F,
) -> Result<T, BoxError>
where
F: FnOnce(ControllerClient) -> Fut + Send + 'static,
Fut: Future<Output = Result<T, BoxError>> + Send + 'static,
T: Send + 'static,
{
let client_clone = client.clone();
run_future_on_bridge_runtime(runtime, operation(client_clone))
}
fn resolve_controller_binding_scope_id(
runtime: &Runtime,
client: &ControllerClient,
) -> Result<String, BoxError> {
run_controller_operation_with_client(runtime, client, |client| async move {
let mut snapshots = client.list_clients().await?.into_iter();
let snapshot = snapshots.next().ok_or_else(|| -> BoxError {
"space controller client did not expose one visible client session".into()
})?;
if snapshots.next().is_some() {
return Err::<String, BoxError>(
"space controller client exposed multiple visible client sessions".into(),
);
}
Ok(snapshot.client_session_id)
})
}
fn run_future_on_bridge_runtime<Fut, T>(runtime: &Runtime, future: Fut) -> Result<T, BoxError>
where
Fut: Future<Output = Result<T, BoxError>> + Send + 'static,
T: Send + 'static,
{
if Handle::try_current().is_ok() {
return run_future_on_bridge_runtime_handle(runtime.handle().clone(), future);
}
runtime.block_on(future)
}
fn run_future_on_bridge_runtime_handle<Fut, T>(
runtime_handle: Handle,
future: Fut,
) -> Result<T, BoxError>
where
Fut: Future<Output = Result<T, BoxError>> + Send + 'static,
T: Send + 'static,
{
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
runtime_handle.spawn(async move {
let result = future.await;
let _ = sender.send(result);
});
receiver
.recv()
.unwrap_or_else(|_| Err("space controller task channel closed".into()))
}
impl Drop for LuaRuntimeSpaceControllerBridge {
fn drop(&mut self) {
let client = self.client.clone();
let _ = thread::Builder::new()
.name("vulcan-space-controller-shutdown".to_string())
.spawn(move || {
let Ok(runtime) = Runtime::new() else {
return;
};
runtime.block_on(async move {
let _ =
tokio::time::timeout(Duration::from_millis(250), client.shutdown()).await;
});
});
}
}
fn map_process_mode(mode: LuaRuntimeSpaceControllerProcessMode) -> ControllerProcessMode {
match mode {
LuaRuntimeSpaceControllerProcessMode::Service => ControllerProcessMode::Service,
LuaRuntimeSpaceControllerProcessMode::Managed => ControllerProcessMode::Managed,
}
}
fn map_space_kind(space_label: &str) -> SpaceKind {
match space_label.trim().to_ascii_uppercase().as_str() {
"ROOT" => SpaceKind::Root,
"USER" => SpaceKind::User,
_ => SpaceKind::Project,
}
}
pub fn controller_space_id_for_binding(binding: &RuntimeDatabaseBindingContext) -> String {
let normalized_label = normalize_controller_space_label(&binding.space_label);
let mut digest = Sha256::new();
digest.update(binding.space_label.trim().as_bytes());
digest.update([0]);
digest.update(binding.space_root.as_bytes());
let hash_hex = format!("{:x}", digest.finalize());
format!("{}-{}", normalized_label, &hash_hex[..16])
}
fn build_controller_binding_id(binding_tag: &str, binding_scope_id: &str) -> String {
format!("{}@{}", binding_tag, binding_scope_id)
}
fn normalize_controller_space_label(space_label: &str) -> String {
let normalized: String = space_label
.trim()
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() {
ch.to_ascii_uppercase()
} else {
'_'
}
})
.collect();
if normalized.is_empty() {
"SPACE".to_string()
} else {
normalized
}
}
#[cfg(test)]
mod tests {
use super::{build_controller_binding_id, run_future_on_bridge_runtime};
use tokio::runtime::{Builder, Runtime};
use vldb_controller_client::BoxError;
fn build_bridge_runtime() -> Runtime {
Runtime::new().expect("bridge runtime should build")
}
#[test]
fn bridge_runtime_executes_futures_for_sync_callers() {
let runtime = build_bridge_runtime();
let result = run_future_on_bridge_runtime(&runtime, async { Ok::<_, BoxError>(7usize) })
.expect("sync caller path should succeed");
assert_eq!(result, 7);
}
#[test]
fn bridge_runtime_executes_futures_inside_current_thread_tokio_runtime() {
let bridge_runtime = build_bridge_runtime();
let host_runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("current-thread host runtime should build");
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
host_runtime.block_on(async {
run_future_on_bridge_runtime(&bridge_runtime, async { Ok::<_, BoxError>(11usize) })
.expect("current-thread caller path should succeed")
})
}))
.expect("current-thread host runtime path should not panic");
assert_eq!(result, 11);
}
#[test]
fn controller_binding_id_preserves_tag_and_adds_scope_suffix() {
assert_eq!(
build_controller_binding_id("ROOT-vulcan-ai-memory", "client-session-123"),
"ROOT-vulcan-ai-memory@client-session-123"
);
}
}