#![deny(unsafe_code)]
mod cache;
mod cell;
mod engine;
mod error;
mod inspect;
mod module;
mod plugin;
pub mod policy;
mod probe;
pub use cache::{DEFAULT_ALIGN, call_filter_hook, call_observe_hook, call_shape_hook};
pub use cc_lb_plugin_wire::schema::HookKind;
pub use cc_lb_plugin_wire::schema::HookKind as SlotKind;
pub use cell::{PluginCell, PluginSlot};
pub use engine::{HostState, HotEngineConfig, build_hot_engine};
pub use error::WasmtimeRuntimeError;
pub use inspect::{ModuleInspection, inspect_wasm};
pub use module::{admit_wasm, compile_module};
pub use plugin::{WasmtimeFilterPlugin, WasmtimeObservabilityHookPlugin, WasmtimeUpstreamDialect};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use cc_lb_plugin_api::SlotKey;
use parking_lot::RwLock;
use wasmtime::{Engine, Linker};
pub struct WasmtimeRuntime {
engine: Arc<Engine>,
linker: Arc<Linker<HostState>>,
config: Arc<HotEngineConfig>,
slots: RwLock<HashMap<SlotKey, Arc<PluginSlot>>>,
}
impl WasmtimeRuntime {
pub fn new(config: HotEngineConfig) -> Result<Self, WasmtimeRuntimeError> {
let engine = build_hot_engine(&config)?;
let linker = Linker::new(&engine);
Ok(Self {
engine: Arc::new(engine),
linker: Arc::new(linker),
config: Arc::new(config),
slots: RwLock::new(HashMap::new()),
})
}
pub fn with_defaults() -> Result<Self, WasmtimeRuntimeError> {
Self::new(HotEngineConfig::default())
}
pub fn engine(&self) -> &Engine {
&self.engine
}
pub fn config(&self) -> &HotEngineConfig {
&self.config
}
pub fn config_arc(&self) -> Arc<HotEngineConfig> {
Arc::clone(&self.config)
}
pub fn linker(&self) -> &Linker<HostState> {
&self.linker
}
pub fn admit_wasm(
&self,
kind: HookKind,
wasm_bytes: &[u8],
) -> Result<ModuleInspection, WasmtimeRuntimeError> {
let (_, inspection) = module::admit_wasm(
&self.engine,
&self.linker,
kind,
wasm_bytes,
self.config.memory_max_pages,
)?;
Ok(inspection)
}
pub fn register_filter(
&self,
slot_key: SlotKey,
name: impl Into<String>,
wasm_bytes: &[u8],
) -> Result<Arc<PluginSlot>, WasmtimeRuntimeError> {
self.register(SlotKind::Filter, slot_key, name, wasm_bytes)
}
pub fn register_shape(
&self,
slot_key: SlotKey,
name: impl Into<String>,
wasm_bytes: &[u8],
) -> Result<Arc<PluginSlot>, WasmtimeRuntimeError> {
self.register(SlotKind::Shape, slot_key, name, wasm_bytes)
}
pub fn register_observe(
&self,
slot_key: SlotKey,
name: impl Into<String>,
wasm_bytes: &[u8],
) -> Result<Arc<PluginSlot>, WasmtimeRuntimeError> {
self.register(SlotKind::Observe, slot_key, name, wasm_bytes)
}
fn register(
&self,
kind: SlotKind,
slot_key: SlotKey,
name: impl Into<String>,
wasm_bytes: &[u8],
) -> Result<Arc<PluginSlot>, WasmtimeRuntimeError> {
let new_content_hash =
module::compute_content_hash(wasm_bytes, self.config.memory_max_pages);
{
let slots = self.slots.read();
if let Some(existing) = slots.get(&slot_key)
&& existing.kind == kind
&& existing.current.load().content_hash == new_content_hash
{
return Ok(Arc::clone(existing));
}
}
let name_string: String = name.into();
let plugin_name: Arc<str> = Arc::from(name_string.as_str());
let (instance_pre, inspection) = module::admit_wasm(
&self.engine,
&self.linker,
kind,
wasm_bytes,
self.config.memory_max_pages,
)?;
let new_cell = PluginCell {
version_id: 1,
instance_pre,
metadata: inspection.metadata,
memory_max_pages: self.config.memory_max_pages,
content_hash: new_content_hash,
plugin_name: Arc::clone(&plugin_name),
};
let mut slots = self.slots.write();
let slot = match slots.get(&slot_key) {
Some(existing) => {
if existing.kind != kind {
return Err(WasmtimeRuntimeError::ModuleRejected {
reason: format!(
"slot {slot_key:?} already registered as `{:?}`; cannot reuse for `{:?}`",
existing.kind, kind
),
});
}
let prev = existing.current.load();
if prev.content_hash == new_content_hash {
return Ok(Arc::clone(existing));
}
let bumped = PluginCell {
version_id: prev.version_id + 1,
instance_pre: new_cell.instance_pre,
metadata: new_cell.metadata,
memory_max_pages: new_cell.memory_max_pages,
content_hash: new_cell.content_hash,
plugin_name: new_cell.plugin_name,
};
existing.current.store(Arc::new(bumped));
Arc::clone(existing)
}
None => {
let slot = Arc::new(PluginSlot::new(name_string, kind, new_cell));
slots.insert(slot_key.clone(), Arc::clone(&slot));
slot
}
};
Ok(slot)
}
pub fn get_slot(&self, slot_key: &SlotKey) -> Option<Arc<PluginSlot>> {
self.slots.read().get(slot_key).map(Arc::clone)
}
pub fn evict_slot(&self, slot_key: &SlotKey) -> bool {
let removed = {
let mut slots = self.slots.write();
slots.remove(slot_key)
};
let existed = removed.is_some();
if existed {
tracing::info!(
target: "cc_lb_runtime_wasmtime::eviction",
?slot_key,
"evicted plugin slot",
);
}
drop(removed);
existed
}
pub fn retain_slots(&self, keep: &HashSet<SlotKey>) -> Vec<SlotKey> {
let (evicted_keys, removed) = {
let mut slots = self.slots.write();
let orphan_keys: Vec<SlotKey> = slots
.keys()
.filter(|k| !keep.contains(*k))
.cloned()
.collect();
let removed: Vec<Arc<PluginSlot>> =
orphan_keys.iter().filter_map(|k| slots.remove(k)).collect();
(orphan_keys, removed)
};
for key in &evicted_keys {
tracing::info!(
target: "cc_lb_runtime_wasmtime::eviction",
slot_key = ?key,
"evicted orphan slot (retain_slots sweep)",
);
}
drop(removed);
evicted_keys
}
fn dispatch<F>(
&self,
slot_key: &SlotKey,
expected_kind: SlotKind,
run: F,
) -> Result<Vec<u8>, WasmtimeRuntimeError>
where
F: FnOnce(&Arc<PluginCell>) -> Result<Vec<u8>, WasmtimeRuntimeError>,
{
let slot = self
.get_slot(slot_key)
.ok_or_else(|| WasmtimeRuntimeError::ModuleRejected {
reason: format!("no slot registered for {:?}", slot_key),
})?;
if slot.kind != expected_kind {
return Err(WasmtimeRuntimeError::ModuleRejected {
reason: format!(
"slot {slot_key:?} is `{:?}`, callable as `{:?}` only",
slot.kind, slot.kind,
),
});
}
let cell = slot.current.load_full();
run(&cell)
}
pub fn call_filter(
&self,
slot_key: &SlotKey,
input: &[u8],
) -> Result<Vec<u8>, WasmtimeRuntimeError> {
self.dispatch(slot_key, SlotKind::Filter, |cell| {
call_filter_hook(cell, input)
})
}
pub fn call_shape(
&self,
slot_key: &SlotKey,
input: &[u8],
) -> Result<Vec<u8>, WasmtimeRuntimeError> {
self.dispatch(slot_key, SlotKind::Shape, |cell| {
call_shape_hook(cell, input)
})
}
pub fn call_observe(
&self,
slot_key: &SlotKey,
input: &[u8],
) -> Result<Vec<u8>, WasmtimeRuntimeError> {
self.dispatch(slot_key, SlotKind::Observe, |cell| {
call_observe_hook(cell, input)
})
}
pub fn slot_count(&self) -> usize {
self.slots.read().len()
}
pub fn publish_pool_metrics(&self) {
let (memories_util, instances_util) = match self.engine.pooling_allocator_metrics() {
Some(m) => {
let mem_denom = self.config.pool_total_memories.max(1) as f64;
let inst_denom = self.config.pool_total_core_instances.max(1) as f64;
let mem_util = (m.memories() as f64) / mem_denom;
let inst_util = (m.core_instances() as f64) / inst_denom;
(mem_util.clamp(0.0, 1.0), inst_util.clamp(0.0, 1.0))
}
None => (0.0, 0.0),
};
metrics::gauge!("cc_lb_plugin_pool_memories_utilization_ratio").set(memories_util);
metrics::gauge!("cc_lb_plugin_pool_core_instances_utilization_ratio").set(instances_util);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn engine_build_only() {
let rt = WasmtimeRuntime::with_defaults().expect("engine build");
let _ = rt.engine();
assert_eq!(rt.slot_count(), 0);
}
#[test]
fn missing_slot_returns_error() {
let rt = WasmtimeRuntime::with_defaults().expect("engine build");
let err = rt
.call_filter(&SlotKey::global("nonexistent"), &[])
.expect_err("must fail on missing slot");
matches!(err, WasmtimeRuntimeError::ModuleRejected { .. });
}
}