#![cfg_attr(test, allow(clippy::unwrap_used, clippy::expect_used))]
#[cfg(feature = "opentelemetry")]
pub mod telemetry;
#[cfg(feature = "sqlite-store")]
use kernex_core::config::MemoryConfig;
use kernex_core::context::{CompactionStrategy, Context, ContextNeeds};
use kernex_core::error::KernexError;
use kernex_core::guardrails::{GuardrailAction, GuardrailRunner};
use kernex_core::hooks::{HookRunner, NoopHookRunner};
use kernex_core::message::{CompletionMeta, Request, Response};
use kernex_core::permissions::PermissionRules;
use kernex_core::run::{RunConfig, RunOutcome};
use kernex_core::stream::StreamEvent;
use kernex_core::traits::Provider;
use kernex_core::traits::StreamingProvider;
use kernex_core::traits::Summarizer;
#[cfg(feature = "sqlite-store")]
use kernex_memory::{Store, UsageBreakdown};
use kernex_skills::{
build_skill_prompt, match_skill_toolboxes, match_skill_triggers, Project, Skill,
};
use std::sync::Arc;
pub use kernex_core as core;
#[cfg(feature = "sqlite-store")]
pub use kernex_memory as memory;
pub use kernex_pipelines as pipelines;
pub use kernex_providers as providers;
pub use kernex_sandbox as sandbox;
pub use kernex_skills as skills;
pub struct Runtime {
#[cfg(feature = "sqlite-store")]
pub store: Store,
pub skills: Vec<Skill>,
pub projects: Vec<Project>,
pub data_dir: String,
pub system_prompt: String,
pub channel: String,
pub project: Option<String>,
pub hook_runner: Arc<dyn HookRunner>,
pub permission_rules: Option<Arc<PermissionRules>>,
pub guardrail_runner: Option<Arc<dyn GuardrailRunner>>,
pub auto_compact: bool,
}
struct ProviderSummarizer<'a> {
provider: &'a dyn Provider,
}
#[async_trait::async_trait]
impl Summarizer for ProviderSummarizer<'_> {
async fn summarize(&self, text: &str) -> Result<String, KernexError> {
let instruction = format!(
"You are a conversation summarizer. Summarize the following \
exchange in 200 words or fewer. Focus on: decisions made, files \
touched, errors encountered, and unresolved questions. Skip \
greetings and small talk. Output the summary only — no preamble.\n\n\
---\n{text}\n---"
);
let mut ctx = Context::new(&instruction);
ctx.system_prompt.clear();
let response = self.provider.complete(&ctx).await?;
Ok(response.text)
}
}
impl Runtime {
pub async fn complete(
&self,
provider: &dyn Provider,
request: &Request,
) -> Result<Response, KernexError> {
self.complete_with_needs(provider, request, &ContextNeeds::default())
.await
}
#[tracing::instrument(
name = "kernex.complete",
skip_all,
fields(provider = provider.name(), sender = %request.sender_id)
)]
pub async fn complete_with_needs(
&self,
provider: &dyn Provider,
request: &Request,
#[allow(unused_variables)] needs: &ContextNeeds,
) -> Result<Response, KernexError> {
let project_ref = self.project.as_deref();
let owned_req;
let request = if let Some(gr) = &self.guardrail_runner {
match gr.check_input(&request.text).await {
GuardrailAction::Allow => request,
GuardrailAction::Block(reason) => return Err(KernexError::Guardrail(reason)),
GuardrailAction::Sanitize(clean) => {
owned_req = Request {
text: clean,
..request.clone()
};
&owned_req
}
}
} else {
request
};
let skill_ctx = build_skill_prompt(&self.skills);
let full_system_prompt = if skill_ctx.prompt.is_empty() {
self.system_prompt.clone()
} else if self.system_prompt.is_empty() {
skill_ctx.prompt.clone()
} else {
format!("{}\n\n{}", self.system_prompt, skill_ctx.prompt)
};
#[cfg(feature = "sqlite-store")]
let mut context = {
let (effective_needs, summarizer): (
std::borrow::Cow<'_, ContextNeeds>,
Option<ProviderSummarizer<'_>>,
) = if self.auto_compact {
let mut owned = needs.clone();
owned.compact = CompactionStrategy::Summarize;
(
std::borrow::Cow::Owned(owned),
Some(ProviderSummarizer { provider }),
)
} else {
(std::borrow::Cow::Borrowed(needs), None)
};
self.store
.build_context(
&self.channel,
request,
&full_system_prompt,
&effective_needs,
project_ref,
summarizer.as_ref().map(|s| s as &dyn Summarizer),
)
.await?
};
#[cfg(not(feature = "sqlite-store"))]
let mut context = {
let mut ctx = kernex_core::context::Context::new(&request.text);
ctx.system_prompt = full_system_prompt;
ctx
};
if context.model.is_none() {
context.model = skill_ctx.model;
}
let mcp_servers = match_skill_triggers(&self.skills, &request.text);
if !mcp_servers.is_empty() {
context.mcp_servers = mcp_servers;
}
let toolboxes = match_skill_toolboxes(&self.skills, &request.text);
if !toolboxes.is_empty() {
context.toolboxes = toolboxes;
}
context.hook_runner = Some(self.hook_runner.clone());
context.permission_rules = self.permission_rules.clone();
let raw_response = provider.complete(&context).await?;
let response = if let Some(gr) = &self.guardrail_runner {
match gr.check_output(&raw_response.text).await {
GuardrailAction::Allow => raw_response,
GuardrailAction::Block(reason) => return Err(KernexError::Guardrail(reason)),
GuardrailAction::Sanitize(clean) => Response {
text: clean,
metadata: raw_response.metadata,
},
}
} else {
raw_response
};
#[allow(unused_variables)]
let project_key = project_ref.unwrap_or("default");
#[cfg(feature = "sqlite-store")]
self.store
.store_exchange(&self.channel, request, &response, project_key)
.await?;
#[cfg(feature = "sqlite-store")]
if let Some(tokens) = response.metadata.tokens_used {
let model = response.metadata.model.as_deref().unwrap_or("unknown");
let session = response.metadata.session_id.as_deref().unwrap_or("default");
let breakdown = UsageBreakdown {
input_tokens: response.metadata.input_tokens,
output_tokens: response.metadata.output_tokens,
cache_read_tokens: response.metadata.cache_read_tokens,
cache_creation_tokens: response.metadata.cache_creation_tokens,
};
if let Err(e) = self
.store
.record_usage_full(&request.sender_id, session, tokens, model, breakdown)
.await
{
tracing::warn!("failed to record token usage: {e}");
}
}
Ok(response)
}
pub async fn complete_stream(
&self,
provider: &dyn StreamingProvider,
request: &Request,
) -> Result<tokio::sync::mpsc::Receiver<StreamEvent>, KernexError> {
self.complete_stream_with_needs(provider, request, &ContextNeeds::default())
.await
}
#[tracing::instrument(
name = "kernex.stream",
skip_all,
fields(provider = provider.name(), sender = %request.sender_id)
)]
pub async fn complete_stream_with_needs(
&self,
provider: &dyn StreamingProvider,
request: &Request,
#[allow(unused_variables)] needs: &ContextNeeds,
) -> Result<tokio::sync::mpsc::Receiver<StreamEvent>, KernexError> {
let project_ref = self.project.as_deref();
let owned_req;
let request = if let Some(gr) = &self.guardrail_runner {
match gr.check_input(&request.text).await {
GuardrailAction::Allow => request,
GuardrailAction::Block(reason) => return Err(KernexError::Guardrail(reason)),
GuardrailAction::Sanitize(clean) => {
owned_req = Request {
text: clean,
..request.clone()
};
&owned_req
}
}
} else {
request
};
let skill_ctx = build_skill_prompt(&self.skills);
let full_system_prompt = if skill_ctx.prompt.is_empty() {
self.system_prompt.clone()
} else if self.system_prompt.is_empty() {
skill_ctx.prompt.clone()
} else {
format!("{}\n\n{}", self.system_prompt, skill_ctx.prompt)
};
#[cfg(feature = "sqlite-store")]
let mut context = {
let (effective_needs, summarizer): (
std::borrow::Cow<'_, ContextNeeds>,
Option<ProviderSummarizer<'_>>,
) = if self.auto_compact {
let mut owned = needs.clone();
owned.compact = CompactionStrategy::Summarize;
(
std::borrow::Cow::Owned(owned),
Some(ProviderSummarizer { provider }),
)
} else {
(std::borrow::Cow::Borrowed(needs), None)
};
self.store
.build_context(
&self.channel,
request,
&full_system_prompt,
&effective_needs,
project_ref,
summarizer.as_ref().map(|s| s as &dyn Summarizer),
)
.await?
};
#[cfg(not(feature = "sqlite-store"))]
let mut context = {
let mut ctx = kernex_core::context::Context::new(&request.text);
ctx.system_prompt = full_system_prompt;
ctx
};
if context.model.is_none() {
context.model = skill_ctx.model;
}
let mcp_servers = match_skill_triggers(&self.skills, &request.text);
if !mcp_servers.is_empty() {
context.mcp_servers = mcp_servers;
}
let toolboxes = match_skill_toolboxes(&self.skills, &request.text);
if !toolboxes.is_empty() {
context.toolboxes = toolboxes;
}
context.hook_runner = Some(self.hook_runner.clone());
context.permission_rules = self.permission_rules.clone();
let provider_name = provider.name().to_string();
let mut upstream = provider.complete_stream(&context).await?;
let (tx, rx) = tokio::sync::mpsc::channel::<StreamEvent>(64);
#[cfg(feature = "sqlite-store")]
let store = self.store.clone();
let channel = self.channel.clone();
let request_clone = request.clone();
#[allow(unused_variables)]
let project_key = project_ref.unwrap_or("default").to_string();
let guardrail_runner = self.guardrail_runner.clone();
tokio::spawn(async move {
use kernex_core::stream::{StreamAccumulator, StreamEvent as SE};
let mut acc = StreamAccumulator::new();
let started = std::time::Instant::now();
while let Some(event) = upstream.recv().await {
acc.push(&event);
let is_terminal = matches!(event, SE::Done | SE::Error(_));
let _ = tx.send(event).await;
if is_terminal {
break;
}
}
#[cfg(feature = "sqlite-store")]
{
let elapsed_ms = started.elapsed().as_millis() as u64;
let accumulated = acc.into_text();
let persisted_text = if let Some(gr) = &guardrail_runner {
match gr.check_output(&accumulated).await {
GuardrailAction::Allow => accumulated,
GuardrailAction::Block(_) => String::new(),
GuardrailAction::Sanitize(clean) => clean,
}
} else {
accumulated
};
let response = Response {
text: persisted_text,
metadata: CompletionMeta {
provider_used: provider_name,
tokens_used: None,
processing_time_ms: elapsed_ms,
model: None,
session_id: None,
..Default::default()
},
};
if let Err(e) = store
.store_exchange(&channel, &request_clone, &response, &project_key)
.await
{
tracing::warn!("failed to persist streaming exchange: {e}");
}
}
#[cfg(not(feature = "sqlite-store"))]
{
let _ = acc;
let _ = started;
let _ = provider_name;
let _ = guardrail_runner;
}
});
Ok(rx)
}
#[tracing::instrument(
name = "kernex.run",
skip_all,
fields(provider = provider.name(), sender = %request.sender_id, turns = config.max_turns)
)]
pub async fn run(
&self,
provider: &dyn Provider,
request: &Request,
config: &RunConfig,
) -> Result<RunOutcome, KernexError> {
let needs = ContextNeeds::default();
let project_ref = self.project.as_deref();
let owned_req;
let request = if let Some(gr) = &self.guardrail_runner {
match gr.check_input(&request.text).await {
GuardrailAction::Allow => request,
GuardrailAction::Block(reason) => return Err(KernexError::Guardrail(reason)),
GuardrailAction::Sanitize(clean) => {
owned_req = Request {
text: clean,
..request.clone()
};
&owned_req
}
}
} else {
request
};
let skill_ctx = build_skill_prompt(&self.skills);
let full_system_prompt = if skill_ctx.prompt.is_empty() {
self.system_prompt.clone()
} else if self.system_prompt.is_empty() {
skill_ctx.prompt.clone()
} else {
format!("{}\n\n{}", self.system_prompt, skill_ctx.prompt)
};
#[cfg(feature = "sqlite-store")]
let mut context = {
let (effective_needs, summarizer): (
std::borrow::Cow<'_, ContextNeeds>,
Option<ProviderSummarizer<'_>>,
) = if self.auto_compact {
let mut owned = needs.clone();
owned.compact = CompactionStrategy::Summarize;
(
std::borrow::Cow::Owned(owned),
Some(ProviderSummarizer { provider }),
)
} else {
(std::borrow::Cow::Borrowed(&needs), None)
};
self.store
.build_context(
&self.channel,
request,
&full_system_prompt,
&effective_needs,
project_ref,
summarizer.as_ref().map(|s| s as &dyn Summarizer),
)
.await?
};
#[cfg(not(feature = "sqlite-store"))]
let mut context = {
let mut ctx = kernex_core::context::Context::new(&request.text);
ctx.system_prompt = full_system_prompt;
ctx
};
if context.model.is_none() {
context.model = skill_ctx.model;
}
let mcp_servers = match_skill_triggers(&self.skills, &request.text);
if !mcp_servers.is_empty() {
context.mcp_servers = mcp_servers;
}
let toolboxes = match_skill_toolboxes(&self.skills, &request.text);
if !toolboxes.is_empty() {
context.toolboxes = toolboxes;
}
context.max_turns = Some(config.max_turns);
context.hook_runner = Some(self.hook_runner.clone());
context.permission_rules = self.permission_rules.clone();
let raw_response = provider.complete(&context).await?;
let response = if let Some(gr) = &self.guardrail_runner {
match gr.check_output(&raw_response.text).await {
GuardrailAction::Allow => raw_response,
GuardrailAction::Block(reason) => return Err(KernexError::Guardrail(reason)),
GuardrailAction::Sanitize(clean) => Response {
text: clean,
metadata: raw_response.metadata,
},
}
} else {
raw_response
};
self.hook_runner.on_stop(&response.text).await;
#[allow(unused_variables)]
let project_key = project_ref.unwrap_or("default");
#[cfg(feature = "sqlite-store")]
self.store
.store_exchange(&self.channel, request, &response, project_key)
.await?;
#[cfg(feature = "sqlite-store")]
if let Some(tokens) = response.metadata.tokens_used {
let model = response.metadata.model.as_deref().unwrap_or("unknown");
let session = response.metadata.session_id.as_deref().unwrap_or("default");
let breakdown = UsageBreakdown {
input_tokens: response.metadata.input_tokens,
output_tokens: response.metadata.output_tokens,
cache_read_tokens: response.metadata.cache_read_tokens,
cache_creation_tokens: response.metadata.cache_creation_tokens,
};
if let Err(e) = self
.store
.record_usage_full(&request.sender_id, session, tokens, model, breakdown)
.await
{
tracing::warn!("failed to record token usage: {e}");
}
}
Ok(RunOutcome::EndTurn(response))
}
}
pub struct RuntimeBuilder {
data_dir: String,
#[cfg(feature = "sqlite-store")]
db_path: Option<String>,
system_prompt: String,
channel: String,
project: Option<String>,
hook_runner: Option<Arc<dyn HookRunner>>,
permission_rules: Option<Arc<PermissionRules>>,
guardrail_runner: Option<Arc<dyn GuardrailRunner>>,
auto_compact: bool,
}
impl RuntimeBuilder {
pub fn new() -> Self {
Self {
data_dir: "~/.kernex".to_string(),
#[cfg(feature = "sqlite-store")]
db_path: None,
system_prompt: String::new(),
channel: "cli".to_string(),
project: None,
hook_runner: None,
permission_rules: None,
guardrail_runner: None,
auto_compact: false,
}
}
pub fn from_file(path: &str) -> Result<Self, kernex_core::error::KernexError> {
let config = kernex_core::config::load_file(path)?;
Ok(Self::from_config(&config))
}
pub fn from_config(config: &kernex_core::config::KernexConfig) -> Self {
let mut builder = Self::new()
.data_dir(&config.runtime.data_dir)
.system_prompt(&config.runtime.system_prompt)
.channel(&config.runtime.channel);
if let Some(proj) = &config.runtime.project {
builder = builder.project(proj);
}
#[cfg(feature = "sqlite-store")]
{
builder = builder.db_path(&config.memory.db_path);
}
builder
}
pub fn from_env() -> Self {
let mut builder = Self::new();
if let Ok(dir) = std::env::var("KERNEX_DATA_DIR") {
warn_if_data_dir_unusual(&dir);
builder = builder.data_dir(&dir);
}
#[cfg(feature = "sqlite-store")]
if let Ok(path) = std::env::var("KERNEX_DB_PATH") {
builder = builder.db_path(&path);
}
if let Ok(prompt) = std::env::var("KERNEX_SYSTEM_PROMPT") {
builder = builder.system_prompt(&prompt);
}
if let Ok(channel) = std::env::var("KERNEX_CHANNEL") {
builder = builder.channel(&channel);
}
if let Ok(project) = std::env::var("KERNEX_PROJECT") {
builder = builder.project(&project);
}
builder
}
pub fn data_dir(mut self, path: &str) -> Self {
self.data_dir = path.to_string();
self
}
#[cfg(feature = "sqlite-store")]
pub fn db_path(mut self, path: &str) -> Self {
self.db_path = Some(path.to_string());
self
}
pub fn system_prompt(mut self, prompt: &str) -> Self {
self.system_prompt = prompt.to_string();
self
}
pub fn channel(mut self, channel: &str) -> Self {
self.channel = channel.to_string();
self
}
pub fn project(mut self, project: &str) -> Self {
self.project = Some(project.to_string());
self
}
pub fn hook_runner(mut self, runner: Arc<dyn HookRunner>) -> Self {
self.hook_runner = Some(runner);
self
}
pub fn permission_rules(mut self, rules: PermissionRules) -> Self {
self.permission_rules = Some(Arc::new(rules));
self
}
pub fn guardrail_runner(mut self, runner: Arc<dyn GuardrailRunner>) -> Self {
self.guardrail_runner = Some(runner);
self
}
pub fn auto_compact(mut self, enable: bool) -> Self {
self.auto_compact = enable;
self
}
pub async fn build(self) -> Result<Runtime, KernexError> {
let expanded_dir = kernex_core::shellexpand(&self.data_dir);
tokio::fs::create_dir_all(&expanded_dir)
.await
.map_err(|e| KernexError::Config(format!("failed to create data dir: {e}")))?;
#[cfg(feature = "sqlite-store")]
let store = {
let db_path = self
.db_path
.unwrap_or_else(|| format!("{expanded_dir}/memory.db"));
let mem_config = MemoryConfig {
db_path: db_path.clone(),
..Default::default()
};
Store::new(&mem_config).await?
};
let skills_data_dir = self.data_dir.clone();
let skills =
tokio::task::spawn_blocking(move || kernex_skills::load_skills(&skills_data_dir))
.await
.map_err(|e| {
KernexError::skill(kernex_skills::SkillError::Logic(format!(
"load_skills task failed: {e}"
)))
})?;
let projects_data_dir = self.data_dir.clone();
let projects =
tokio::task::spawn_blocking(move || kernex_skills::load_projects(&projects_data_dir))
.await
.map_err(|e| {
KernexError::skill(kernex_skills::SkillError::Logic(format!(
"load_projects task failed: {e}"
)))
})?;
tracing::info!(
"runtime initialized: {} skills, {} projects",
skills.len(),
projects.len()
);
let hook_runner: Arc<dyn HookRunner> =
self.hook_runner.unwrap_or_else(|| Arc::new(NoopHookRunner));
Ok(Runtime {
#[cfg(feature = "sqlite-store")]
store,
skills,
projects,
data_dir: expanded_dir,
system_prompt: self.system_prompt,
channel: self.channel,
project: self.project,
hook_runner,
permission_rules: self.permission_rules,
guardrail_runner: self.guardrail_runner,
auto_compact: self.auto_compact,
})
}
}
impl Default for RuntimeBuilder {
fn default() -> Self {
Self::new()
}
}
fn warn_if_data_dir_unusual(dir: &str) {
let path = std::path::Path::new(dir);
if !path.is_absolute() {
return;
}
let s = dir;
let in_home = std::env::var("HOME")
.ok()
.map(|h| !h.is_empty() && s.starts_with(&h))
.unwrap_or(false);
let usual = in_home
|| s.starts_with("/tmp/")
|| s.starts_with("/var/")
|| s.starts_with("/Users/")
|| s.starts_with("/home/")
|| s == "/tmp"
|| s == "/var";
if !usual {
tracing::warn!(
data_dir = %dir,
"KERNEX_DATA_DIR resolves outside $HOME / /tmp / /var — \
writes may land in unexpected locations"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_runtime_builder_creates_runtime() {
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp = tmp_dir.path();
let runtime = RuntimeBuilder::new()
.data_dir(tmp.to_str().unwrap())
.build()
.await
.unwrap();
assert!(runtime.skills.is_empty());
assert!(runtime.projects.is_empty());
assert!(runtime.system_prompt.is_empty());
assert_eq!(runtime.channel, "cli");
assert!(runtime.project.is_none());
assert!(std::path::Path::new(&runtime.data_dir).exists());
}
#[tokio::test]
async fn test_runtime_builder_custom_db_path() {
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp = tmp_dir.path();
let db = tmp.join("custom.db");
let runtime = RuntimeBuilder::new()
.data_dir(tmp.to_str().unwrap())
.db_path(db.to_str().unwrap())
.build()
.await
.unwrap();
assert!(db.exists());
drop(runtime);
}
#[tokio::test]
async fn test_runtime_builder_with_config() {
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp = tmp_dir.path();
let runtime = RuntimeBuilder::new()
.data_dir(tmp.to_str().unwrap())
.system_prompt("You are helpful.")
.channel("api")
.project("my-project")
.build()
.await
.unwrap();
assert_eq!(runtime.system_prompt, "You are helpful.");
assert_eq!(runtime.channel, "api");
assert_eq!(runtime.project, Some("my-project".to_string()));
}
#[tokio::test]
async fn test_runtime_builder_from_config() {
use kernex_core::config::{KernexConfig, MemoryConfig, RuntimeConfig};
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp = tmp_dir.path();
let cfg = KernexConfig {
runtime: RuntimeConfig {
name: "test-agent".to_string(),
data_dir: tmp.to_str().unwrap().to_string(),
channel: "slack".to_string(),
project: Some("my-proj".to_string()),
system_prompt: "Be concise.".to_string(),
..RuntimeConfig::default()
},
memory: MemoryConfig {
db_path: tmp.join("memory.db").to_str().unwrap().to_string(),
..MemoryConfig::default()
},
..KernexConfig::default()
};
let runtime = RuntimeBuilder::from_config(&cfg).build().await.unwrap();
assert_eq!(runtime.channel, "slack");
assert_eq!(runtime.project, Some("my-proj".to_string()));
assert_eq!(runtime.system_prompt, "Be concise.");
}
#[tokio::test]
async fn test_runtime_builder_from_file_toml() {
use std::io::Write;
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp = tmp_dir.path();
let escaped = tmp.to_str().unwrap().replace('\\', "\\\\");
let cfg_path = tmp.join("agent.toml");
let mut f = std::fs::File::create(&cfg_path).unwrap();
writeln!(
f,
r#"[runtime]
name = "file-agent"
data_dir = "{escaped}"
channel = "api"
project = "file-proj"
system_prompt = "From file."
[memory]
db_path = "{escaped}/memory.db"
"#
)
.unwrap();
let runtime = RuntimeBuilder::from_file(cfg_path.to_str().unwrap())
.unwrap()
.build()
.await
.unwrap();
assert_eq!(runtime.channel, "api");
assert_eq!(runtime.project, Some("file-proj".to_string()));
assert_eq!(runtime.system_prompt, "From file.");
}
}