use crate::daemon::Daemon;
use crate::daemon::{cron::CronEntry, event::EventSubscription};
use anyhow::Result;
use crabllm_core::Provider;
use runtime::Env;
use std::collections::VecDeque;
use std::io::{BufRead, BufReader};
use wcore::protocol::message::*;
pub(super) async fn ping() -> Result<()> {
Ok(())
}
pub(super) async fn reload<P: Provider + 'static>(node: &Daemon<P>) -> Result<()> {
node.reload().await
}
pub(super) async fn get_stats<P: Provider + 'static>(node: &Daemon<P>) -> Result<DaemonStats> {
let rt = node.runtime.read().await.clone();
let active = rt.conversation_count().await;
let agents = rt.agents().len() as u32;
let uptime = node.started_at.elapsed().as_secs();
let active_model = super::config::active_model(node).await;
Ok(DaemonStats {
uptime_secs: uptime,
active_conversations: active as u32,
registered_agents: agents,
active_model,
})
}
pub(super) async fn create_cron<P: Provider + 'static>(
node: &Daemon<P>,
req: CreateCronMsg,
) -> Result<CronInfo> {
let rt = node.runtime.read().await.clone();
if rt.agent(&req.agent).is_none() {
anyhow::bail!("agent '{}' not found", req.agent);
}
let entry = CronEntry {
id: 0,
schedule: req.schedule,
skill: req.skill,
agent: req.agent,
sender: req.sender,
quiet_start: req.quiet_start,
quiet_end: req.quiet_end,
once: req.once,
};
let created = node
.crons
.lock()
.await
.create(entry, node.crons.clone())
.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(cron_entry_to_info(&created))
}
pub(super) async fn delete_cron<P: Provider + 'static>(node: &Daemon<P>, id: u64) -> Result<bool> {
Ok(node.crons.lock().await.delete(id))
}
pub(super) async fn list_crons<P: Provider + 'static>(node: &Daemon<P>) -> Result<CronList> {
let entries = node.crons.lock().await.list();
Ok(CronList {
crons: entries.iter().map(cron_entry_to_info).collect(),
})
}
pub(super) fn subscribe_events<P: Provider + 'static>(
node: &Daemon<P>,
) -> impl futures_core::Stream<Item = Result<AgentEventMsg>> + Send {
let runtime = node.runtime.clone();
async_stream::try_stream! {
let rt = runtime.read().await.clone();
let Some(mut rx) = rt.env.subscribe_events() else {
return;
};
loop {
match rx.recv().await {
Ok(event) => yield event,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
}
}
}
}
pub(super) async fn subscribe_event<P: Provider + 'static>(
node: &Daemon<P>,
req: SubscribeEventMsg,
) -> Result<SubscriptionInfo> {
let rt = node.runtime.read().await.clone();
if rt.agent(&req.target_agent).is_none() {
anyhow::bail!("agent '{}' not found", req.target_agent);
}
let sub = EventSubscription {
id: 0,
source: req.source,
target_agent: req.target_agent,
once: req.once,
};
let created = node.events.lock().subscribe(sub);
Ok(subscription_to_info(&created))
}
pub(super) async fn unsubscribe_event<P: Provider + 'static>(
node: &Daemon<P>,
id: u64,
) -> Result<bool> {
Ok(node.events.lock().unsubscribe(id))
}
pub(super) async fn list_subscriptions<P: Provider + 'static>(
node: &Daemon<P>,
) -> Result<SubscriptionList> {
let subs = node.events.lock().list();
Ok(SubscriptionList {
subscriptions: subs.iter().map(subscription_to_info).collect(),
})
}
pub(super) async fn publish_event<P: Provider + 'static>(
node: &Daemon<P>,
req: PublishEventMsg,
) -> Result<()> {
node.events.lock().publish(&req.source, &req.payload);
Ok(())
}
pub(super) async fn start_service<P: Provider + 'static>(
node: &Daemon<P>,
name: String,
force: bool,
) -> Result<()> {
let cmd = find_command_service(node, &name)?;
let label = format!("ai.crabtalk.{name}");
if !force && command::service::is_installed(&label) {
anyhow::bail!("service '{name}' is already running, use force to restart");
}
let binary = find_binary(&cmd.krate)?;
let rendered = command::service::render_service_template(
&CommandService {
name: name.clone(),
description: cmd.description.clone(),
label: label.clone(),
},
&binary,
);
command::service::install(&rendered, &label)
}
pub(super) async fn stop_service(name: String) -> Result<()> {
let label = format!("ai.crabtalk.{name}");
command::service::uninstall(&label)?;
let _ = std::fs::remove_file(wcore::paths::service_port_file(&name));
Ok(())
}
pub(super) async fn service_logs(name: String, lines: u32) -> Result<String> {
let path = wcore::paths::service_log_path(&name);
if !path.exists() {
return Ok(format!("no logs yet: {}", path.display()));
}
let file = std::fs::File::open(&path)
.map_err(|e| anyhow::anyhow!("failed to open {}: {e}", path.display()))?;
let n = if lines == 0 { 50 } else { lines as usize };
let mut tail: VecDeque<String> = VecDeque::with_capacity(n);
for line in BufReader::new(file).lines() {
let line = line?;
if tail.len() == n {
tail.pop_front();
}
tail.push_back(line);
}
Ok(tail.into_iter().collect::<Vec<_>>().join("\n"))
}
fn find_command_service<P: Provider + 'static>(
node: &Daemon<P>,
name: &str,
) -> Result<plugin::manifest::CommandConfig> {
for (_, manifest) in super::plugin::scan_plugin_manifests(&node.config_dir) {
if let Some(cmd) = manifest.commands.get(name) {
return Ok(cmd.clone());
}
}
anyhow::bail!("command service '{name}' not found in installed plugins")
}
fn find_binary(name: &str) -> Result<std::path::PathBuf> {
if let Ok(path_var) = std::env::var("PATH") {
for dir in std::env::split_paths(&path_var) {
let candidate = dir.join(name);
if candidate.exists() {
return Ok(candidate);
}
}
}
let cargo_bin = wcore::paths::CONFIG_DIR
.parent()
.unwrap_or(std::path::Path::new("/"))
.join(".cargo/bin")
.join(name);
if cargo_bin.exists() {
return Ok(cargo_bin);
}
anyhow::bail!("binary '{name}' not found in PATH or ~/.cargo/bin")
}
struct CommandService {
name: String,
description: String,
label: String,
}
impl command::service::Service for CommandService {
fn name(&self) -> &str {
&self.name
}
fn description(&self) -> &str {
&self.description
}
fn label(&self) -> &str {
&self.label
}
}
fn cron_entry_to_info(e: &CronEntry) -> CronInfo {
CronInfo {
id: e.id,
schedule: e.schedule.clone(),
skill: e.skill.clone(),
agent: e.agent.clone(),
quiet_start: e.quiet_start.clone().unwrap_or_default(),
quiet_end: e.quiet_end.clone().unwrap_or_default(),
once: e.once,
sender: e.sender.clone(),
}
}
fn subscription_to_info(sub: &EventSubscription) -> SubscriptionInfo {
SubscriptionInfo {
id: sub.id,
source: sub.source.clone(),
target_agent: sub.target_agent.clone(),
once: sub.once,
}
}