use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::io::Write;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use axum::{
body::Body,
extract::{Json, State},
http::{HeaderMap, StatusCode},
response::{IntoResponse, Response},
routing::{get, post},
Router,
};
use bytes::Bytes;
use chrono::{DateTime, Local, TimeZone};
use chrono_english::{parse_date_string, Dialect};
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use futures::StreamExt;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use time::{Date, PrimitiveDateTime, Time, UtcOffset};
use crate::client::ButterflyBot;
use crate::config::Config;
use crate::config_store;
use crate::error::{ButterflyBotError, Result};
use crate::factories::agent_factory::load_markdown_content;
use crate::inbox_fsm::{InboxAction, InboxState};
use crate::inbox_state::InboxStateStore;
use crate::interfaces::scheduler::ScheduledJob;
use crate::planning::{resolve_plan_db_path, PlanStore};
use crate::reminders::{resolve_reminder_db_path, ReminderStore};
use crate::sandbox::{SandboxSettings, ToolRuntime};
use crate::scheduler::Scheduler;
use crate::security::policy::SigningIntent;
use crate::security::signer_daemon::{SignerRequest, SignerService};
use crate::security::solana_rpc_policy::SolanaRpcExecutionPolicy;
use crate::security::x402::canonicalize_payment_required;
use crate::services::agent::UiEvent;
use crate::services::query::{OutputFormat, ProcessOptions, ProcessResult, UserInput};
use crate::tasks::{resolve_task_db_path, TaskStore};
use crate::todo::{resolve_todo_db_path, TodoStore};
use crate::vault;
use crate::wakeup::WakeupStore;
use tokio::sync::{broadcast, RwLock};
#[derive(Clone)]
pub struct AppState {
pub agent: Arc<RwLock<Arc<ButterflyBot>>>,
pub reminder_store: Arc<ReminderStore>,
pub signer_service: SignerService,
pub token: String,
pub ui_event_tx: broadcast::Sender<UiEvent>,
pub db_path: String,
}
static AUTONOMY_LAST_RUN_TS: AtomicI64 = AtomicI64::new(0);
static AUTONOMY_COOLDOWN_SECS: AtomicI64 = AtomicI64::new(60);
fn set_autonomy_cooldown_seconds(seconds: u64) {
AUTONOMY_COOLDOWN_SECS.store(seconds.max(1) as i64, Ordering::Relaxed);
}
fn try_begin_autonomy_tick(now_ts: i64) -> Option<i64> {
loop {
let cooldown = AUTONOMY_COOLDOWN_SECS.load(Ordering::Relaxed).max(1);
let last = AUTONOMY_LAST_RUN_TS.load(Ordering::Relaxed);
if last > 0 {
let elapsed = now_ts.saturating_sub(last);
if elapsed < cooldown {
return Some(cooldown - elapsed);
}
}
if AUTONOMY_LAST_RUN_TS
.compare_exchange(last, now_ts, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
return None;
}
}
}
struct BrainTickJob {
agent: Arc<RwLock<Arc<ButterflyBot>>>,
interval: Duration,
}
#[async_trait::async_trait]
impl ScheduledJob for BrainTickJob {
fn name(&self) -> &str {
"brain_tick"
}
fn interval(&self) -> Duration {
self.interval
}
async fn run(&self) -> Result<()> {
let agent = self.agent.read().await.clone();
agent.brain_tick().await;
Ok(())
}
}
struct WakeupJob {
agent: Arc<RwLock<Arc<ButterflyBot>>>,
store: Arc<WakeupStore>,
interval: Duration,
ui_event_tx: broadcast::Sender<UiEvent>,
audit_log_path: Option<String>,
heartbeat_source: crate::config::MarkdownSource,
db_path: String,
}
struct ScheduledTasksJob {
agent: Arc<RwLock<Arc<ButterflyBot>>>,
store: Arc<TaskStore>,
interval: Duration,
ui_event_tx: broadcast::Sender<UiEvent>,
audit_log_path: Option<String>,
}
struct ReminderDispatchJob {
store: Arc<ReminderStore>,
interval: Duration,
ui_event_tx: broadcast::Sender<UiEvent>,
audit_log_path: Option<String>,
}
#[async_trait::async_trait]
impl ScheduledJob for ScheduledTasksJob {
fn name(&self) -> &str {
"scheduled_tasks"
}
fn interval(&self) -> Duration {
self.interval
}
async fn run(&self) -> Result<()> {
let now = now_ts();
let tasks = self.store.list_due(now, 32).await?;
for task in tasks {
let agent = self.agent.read().await.clone();
let run_at = now_ts();
let next_run_at = if let Some(interval) = task.interval_minutes {
run_at + interval.max(1) * 60
} else {
run_at
};
if task.interval_minutes.is_some() {
let _ = self.store.mark_run(task.id, run_at, next_run_at).await;
} else {
let _ = self.store.complete_one_shot(task.id).await;
}
let options = ProcessOptions {
prompt: None,
images: Vec::new(),
output_format: OutputFormat::Text,
image_detail: "auto".to_string(),
json_schema: None,
};
let input = format!("Scheduled task '{}': {}", task.name, task.prompt);
let result = agent
.process(&task.user_id, UserInput::Text(input), options)
.await;
let (status, payload): (String, serde_json::Value) = match result {
Ok(ProcessResult::Text(text)) => (
"ok".to_string(),
json!({"task_id": task.id, "name": task.name, "output": text}),
),
Ok(other) => (
"ok".to_string(),
json!({"task_id": task.id, "name": task.name, "output": format!("{other:?}")}),
),
Err(err) => (
"error".to_string(),
json!({"task_id": task.id, "name": task.name, "error": err.to_string()}),
),
};
let event = UiEvent {
event_type: "tasks".to_string(),
user_id: task.user_id.clone(),
tool: "tasks".to_string(),
status: status.clone(),
payload: payload.clone(),
timestamp: run_at,
};
let _ = self.ui_event_tx.send(event);
let _ = write_tasks_audit_log(
self.audit_log_path.as_deref(),
run_at,
&task,
status.as_str(),
payload,
);
}
Ok(())
}
}
#[async_trait::async_trait]
impl ScheduledJob for ReminderDispatchJob {
fn name(&self) -> &str {
"reminder_dispatch"
}
fn interval(&self) -> Duration {
self.interval
}
async fn run(&self) -> Result<()> {
let now = now_ts();
let due = self.store.peek_due_reminders_all(now, 32).await?;
for reminder in due {
let title = reminder.item.title.clone();
let base_payload = json!({
"id": reminder.item.id,
"title": title,
"due_at": reminder.item.due_at,
});
let _ = self.ui_event_tx.send(UiEvent {
event_type: "reminder_delivery".to_string(),
user_id: reminder.user_id.clone(),
tool: "reminders".to_string(),
status: "queued".to_string(),
payload: base_payload.clone(),
timestamp: now,
});
let _ = write_reminder_audit_log(
self.audit_log_path.as_deref(),
now,
&reminder.user_id,
reminder.item.id,
"queued",
base_payload.clone(),
);
let _ = self.ui_event_tx.send(UiEvent {
event_type: "reminder".to_string(),
user_id: reminder.user_id.clone(),
tool: "reminders".to_string(),
status: "due".to_string(),
payload: base_payload.clone(),
timestamp: now,
});
let _ = self.ui_event_tx.send(UiEvent {
event_type: "reminder_delivery".to_string(),
user_id: reminder.user_id.clone(),
tool: "reminders".to_string(),
status: "delivery_attempted".to_string(),
payload: base_payload.clone(),
timestamp: now,
});
let _ = write_reminder_audit_log(
self.audit_log_path.as_deref(),
now,
&reminder.user_id,
reminder.item.id,
"delivery_attempted",
base_payload.clone(),
);
let delivered = send_desktop_notification("Butterfly Bot reminder", &title);
if delivered {
let _ = self
.store
.mark_fired_reminder(&reminder.user_id, reminder.item.id, now)
.await;
let _ = self.ui_event_tx.send(UiEvent {
event_type: "reminder_delivery".to_string(),
user_id: reminder.user_id.clone(),
tool: "reminders".to_string(),
status: "delivered".to_string(),
payload: base_payload.clone(),
timestamp: now,
});
let _ = write_reminder_audit_log(
self.audit_log_path.as_deref(),
now,
&reminder.user_id,
reminder.item.id,
"delivered",
base_payload.clone(),
);
} else {
let _ = self.ui_event_tx.send(UiEvent {
event_type: "reminder_delivery".to_string(),
user_id: reminder.user_id.clone(),
tool: "reminders".to_string(),
status: "delivery_failed".to_string(),
payload: base_payload,
timestamp: now,
});
let _ = write_reminder_audit_log(
self.audit_log_path.as_deref(),
now,
&reminder.user_id,
reminder.item.id,
"delivery_failed",
json!({
"id": reminder.item.id,
"title": reminder.item.title,
"due_at": reminder.item.due_at,
}),
);
}
}
Ok(())
}
}
#[async_trait::async_trait]
impl ScheduledJob for WakeupJob {
fn name(&self) -> &str {
"wakeup"
}
fn interval(&self) -> Duration {
self.interval
}
async fn run(&self) -> Result<()> {
let now = now_ts();
let dynamic_source = Config::from_store(&self.db_path)
.ok()
.map(|cfg| cfg.heartbeat_source)
.unwrap_or_else(|| self.heartbeat_source.clone());
let prompt_source = Config::from_store(&self.db_path)
.ok()
.map(|cfg| cfg.prompt_source);
match load_markdown_content(&dynamic_source).await {
Ok(markdown) => {
let agent = self.agent.read().await.clone();
agent.set_heartbeat_markdown(markdown.clone()).await;
let status = if markdown
.as_ref()
.map(|m| !m.trim().is_empty())
.unwrap_or(false)
{
"ok"
} else {
"empty"
};
let event = UiEvent {
event_type: "wakeup".to_string(),
user_id: "system".to_string(),
tool: "heartbeat".to_string(),
status: status.to_string(),
payload: json!({"source": dynamic_source}),
timestamp: now_ts(),
};
let _ = self.ui_event_tx.send(event);
}
Err(err) => {
let event = UiEvent {
event_type: "wakeup".to_string(),
user_id: "system".to_string(),
tool: "heartbeat".to_string(),
status: "error".to_string(),
payload: json!({"source": dynamic_source, "error": err.to_string()}),
timestamp: now_ts(),
};
let _ = self.ui_event_tx.send(event);
}
}
if let Some(source) = &prompt_source {
match load_markdown_content(source).await {
Ok(markdown) => {
let agent = self.agent.read().await.clone();
agent.set_prompt_markdown(markdown.clone()).await;
let status = if markdown
.as_ref()
.map(|m| !m.trim().is_empty())
.unwrap_or(false)
{
"ok"
} else {
"empty"
};
let event = UiEvent {
event_type: "wakeup".to_string(),
user_id: "system".to_string(),
tool: "prompt".to_string(),
status: status.to_string(),
payload: json!({"source": source}),
timestamp: now_ts(),
};
let _ = self.ui_event_tx.send(event);
}
Err(err) => {
let event = UiEvent {
event_type: "wakeup".to_string(),
user_id: "system".to_string(),
tool: "prompt".to_string(),
status: "error".to_string(),
payload: json!({"source": source, "error": err.to_string()}),
timestamp: now_ts(),
};
let _ = self.ui_event_tx.send(event);
}
}
}
{
let agent = self.agent.read().await.clone();
let ui_event_tx = self.ui_event_tx.clone();
tokio::spawn(async move {
run_autonomy_tick(agent, ui_event_tx, "system".to_string(), "wakeup").await;
});
}
let tasks = self.store.list_due(now, 32).await?;
for task in tasks {
let agent = self.agent.read().await.clone();
let run_at = now_ts();
let next_run_at = run_at + task.interval_minutes.max(1) * 60;
let _ = self.store.mark_run(task.id, run_at, next_run_at).await;
let options = ProcessOptions {
prompt: None,
images: Vec::new(),
output_format: OutputFormat::Text,
image_detail: "auto".to_string(),
json_schema: None,
};
let input = format!("Wakeup task '{}': {}", task.name, task.prompt);
let result = agent
.process(&task.user_id, UserInput::Text(input), options)
.await;
let (status, payload): (String, Value) = match result {
Ok(ProcessResult::Text(text)) => (
"ok".to_string(),
json!({"task_id": task.id, "name": task.name, "output": text}),
),
Ok(other) => (
"ok".to_string(),
json!({"task_id": task.id, "name": task.name, "output": format!("{other:?}")}),
),
Err(err) => (
"error".to_string(),
json!({"task_id": task.id, "name": task.name, "error": err.to_string()}),
),
};
let event = UiEvent {
event_type: "wakeup".to_string(),
user_id: task.user_id.clone(),
tool: "wakeup".to_string(),
status: status.clone(),
payload: payload.clone(),
timestamp: run_at,
};
let _ = self.ui_event_tx.send(event);
let _ = write_wakeup_audit_log(
self.audit_log_path.as_deref(),
run_at,
&task,
status.as_str(),
payload.clone(),
);
}
Ok(())
}
}
#[derive(Serialize)]
struct HealthResponse {
status: String,
}
#[derive(Deserialize)]
struct ProcessTextRequest {
user_id: String,
text: String,
prompt: Option<String>,
}
#[derive(Serialize)]
struct ProcessTextResponse {
text: String,
}
#[derive(Deserialize)]
struct MemorySearchRequest {
user_id: String,
query: String,
limit: Option<usize>,
}
#[derive(Deserialize)]
struct ChatHistoryQuery {
user_id: String,
limit: Option<usize>,
}
#[derive(Deserialize)]
struct ClearHistoryRequest {
user_id: String,
}
#[derive(Deserialize)]
struct ClearUserDataRequest {
user_id: String,
}
#[derive(Deserialize)]
struct PreloadBootRequest {
user_id: String,
}
#[derive(Serialize)]
struct PreloadBootResponse {
context_status: String,
heartbeat_status: String,
}
#[derive(Deserialize)]
struct ReminderStreamQuery {
user_id: String,
}
#[derive(Deserialize)]
struct UiEventStreamQuery {
user_id: Option<String>,
}
#[derive(Deserialize)]
struct ReminderDeliveryEventsQuery {
user_id: Option<String>,
limit: Option<usize>,
}
#[derive(Deserialize)]
struct AuditEventsQuery {
user_id: Option<String>,
limit: Option<usize>,
}
#[derive(Deserialize)]
struct InboxQuery {
user_id: String,
limit: Option<usize>,
include_done: Option<bool>,
}
#[derive(Deserialize)]
struct InboxTransitionRequest {
user_id: String,
origin_ref: String,
action: String,
}
#[derive(Serialize)]
struct InboxTransitionResponse {
status: String,
origin_ref: String,
previous_status: String,
next_status: String,
}
#[derive(Serialize, Clone)]
struct InboxItemResponse {
id: String,
source_type: String,
source_id: i32,
title: String,
details: Option<String>,
owner: String,
status: String,
priority: String,
due_at: Option<i64>,
created_at: i64,
updated_at: i64,
requires_human_action: bool,
origin_ref: String,
dependency_refs: Vec<String>,
t_shirt_size: Option<String>,
story_points: Option<i32>,
estimate_optimistic_minutes: Option<i32>,
estimate_likely_minutes: Option<i32>,
estimate_pessimistic_minutes: Option<i32>,
}
#[derive(Serialize)]
struct InboxResponse {
items: Vec<InboxItemResponse>,
}
#[derive(Serialize)]
struct InboxActionableCountResponse {
actionable_count: usize,
}
#[derive(Serialize)]
struct ReminderDeliveryEventsResponse {
events: Vec<Value>,
}
#[derive(Serialize)]
struct AuditEventsResponse {
events: Vec<Value>,
}
#[derive(Serialize)]
struct MemorySearchResponse {
results: Vec<String>,
}
#[derive(Serialize)]
struct ChatHistoryResponse {
history: Vec<String>,
}
#[derive(Serialize)]
struct ClearHistoryResponse {
status: String,
message: String,
}
#[derive(Serialize)]
struct ClearUserDataResponse {
status: String,
message: String,
cleared: Value,
}
#[derive(Serialize)]
struct ErrorResponse {
error: String,
}
#[derive(Deserialize)]
struct SignerIdRequest {
request_id: String,
}
#[derive(Deserialize)]
struct X402PreviewRequest {
request_id: String,
actor: String,
user_id: String,
payment_required: Value,
merchant_origin: Option<String>,
context_requires_approval: Option<bool>,
idempotency_key: Option<String>,
}
#[derive(Serialize)]
struct X402PreviewResponse {
canonical_intent: crate::security::x402::CanonicalX402Intent,
signer: crate::security::signer_daemon::SignerResponse,
}
#[derive(Deserialize)]
struct SolanaWalletQuery {
user_id: String,
actor: Option<String>,
}
#[derive(Serialize)]
struct SolanaWalletResponse {
user_id: String,
actor: String,
address: String,
}
#[derive(Deserialize)]
struct SolanaBalanceQuery {
address: Option<String>,
user_id: Option<String>,
actor: Option<String>,
}
fn asks_for_wallet_address_only(text: &str) -> bool {
let normalized = text.to_ascii_lowercase();
let asks_for_address = normalized.contains("wallet address")
|| normalized.contains("my address")
|| (normalized.contains("wallet") && normalized.contains("address"));
asks_for_address
&& !normalized.contains("balance")
&& !normalized.contains("transfer")
&& !normalized.contains("send")
&& !normalized.contains("transaction")
&& !normalized.contains("tx")
&& !normalized.contains("history")
}
fn asks_for_wallet_balance_only(text: &str) -> bool {
let normalized = text.to_ascii_lowercase();
let asks_for_balance = normalized.contains("balance") || normalized.contains("lamports");
let in_solana_context = normalized.contains("solana") || normalized.contains("wallet");
asks_for_balance
&& in_solana_context
&& !normalized.contains("transfer")
&& !normalized.contains("send")
&& !normalized.contains("transaction")
&& !normalized.contains("tx")
&& !normalized.contains("history")
}
async fn solana_balance_line_for_user(state: &AppState, user_id: &str) -> Result<String> {
let policy = load_solana_rpc_policy(state)?;
let endpoint = require_solana_rpc_endpoint(&policy)?;
let address = crate::security::solana_signer::wallet_address(user_id, "agent")?;
let lamports = crate::solana_rpc::get_balance(&endpoint, &address, &policy.commitment).await?;
let sol = lamports as f64 / 1_000_000_000f64;
Ok(format!(
"Your Solana balance is {:.9} SOL ({} lamports).",
sol, lamports
))
}
#[derive(Serialize)]
struct SolanaBalanceResponse {
address: String,
lamports: u64,
sol: f64,
}
#[derive(Deserialize)]
struct SolanaTransferRequest {
request_id: String,
user_id: String,
actor: Option<String>,
to: String,
lamports: u64,
payee: Option<String>,
simulate_only: Option<bool>,
}
#[derive(Serialize)]
struct SolanaTransferResponse {
status: String,
request_id: String,
wallet_address: String,
signer_reason_code: String,
simulation: Option<Value>,
signature: Option<String>,
}
#[derive(Deserialize)]
struct SolanaTxStatusQuery {
signature: String,
}
#[derive(Serialize)]
struct SolanaTxStatusResponse {
signature: String,
value: Value,
}
#[derive(Deserialize)]
struct SolanaTxHistoryQuery {
address: Option<String>,
user_id: Option<String>,
actor: Option<String>,
limit: Option<usize>,
}
#[derive(Serialize)]
struct SolanaTxHistoryResponse {
address: String,
entries: Value,
}
#[derive(Serialize, Clone)]
struct DoctorCheck {
name: String,
status: String,
message: String,
fix_hint: Option<String>,
}
#[derive(Serialize)]
struct DoctorResponse {
overall: String,
checks: Vec<DoctorCheck>,
}
#[derive(Serialize, Clone)]
struct SecurityAuditFinding {
id: String,
severity: String,
status: String,
message: String,
fix_hint: Option<String>,
auto_fixable: bool,
}
#[derive(Serialize)]
struct SecurityAuditResponse {
overall: String,
findings: Vec<SecurityAuditFinding>,
}
#[derive(Serialize)]
struct FactoryResetConfigResponse {
status: String,
message: String,
config: Value,
}
pub fn build_router(state: AppState) -> Router {
Router::new()
.route("/health", get(health))
.route("/inbox", get(inbox))
.route("/inbox/actionable_count", get(inbox_actionable_count))
.route("/inbox/transition", post(inbox_transition))
.route("/audit/events", get(audit_events))
.route("/reminders/delivery_events", get(reminder_delivery_events))
.route("/doctor", post(doctor))
.route("/security_audit", post(security_audit))
.route("/process_text", post(process_text))
.route("/process_text_stream", post(process_text_stream))
.route("/chat_history", get(chat_history))
.route("/clear_user_history", post(clear_user_history))
.route("/clear_user_data", post(clear_user_data))
.route("/memory_search", post(memory_search))
.route("/preload_boot", post(preload_boot))
.route("/reminder_stream", get(reminder_stream))
.route("/ui_events", get(ui_events))
.route("/factory_reset_config", post(factory_reset_config))
.route("/reload_config", post(reload_config))
.route("/signer/preview", post(signer_preview))
.route("/signer/approve", post(signer_approve))
.route("/signer/sign", post(signer_sign))
.route("/signer/deny", post(signer_deny))
.route("/x402/preview", post(x402_preview))
.route("/solana/wallet", get(solana_wallet))
.route("/solana/balance", get(solana_balance))
.route("/solana/transfer", post(solana_transfer))
.route("/solana/simulate_transfer", post(solana_simulate_transfer))
.route("/solana/tx/status", get(solana_tx_status))
.route("/solana/tx/history", get(solana_tx_history))
.with_state(state)
}
async fn inbox(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Query(query): axum::extract::Query<InboxQuery>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let limit = query.limit.unwrap_or(200).clamp(1, 500);
let include_done = query.include_done.unwrap_or(true);
match build_inbox_items(&state.db_path, &query.user_id, limit, include_done).await {
Ok(items) => (StatusCode::OK, Json(InboxResponse { items })).into_response(),
Err(err) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn inbox_actionable_count(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Query(query): axum::extract::Query<InboxQuery>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
match build_inbox_items(&state.db_path, &query.user_id, 500, false).await {
Ok(items) => {
let actionable_count = items
.iter()
.filter(|item| {
item.owner == "human"
&& item.requires_human_action
&& matches!(
item.status.as_str(),
"new" | "acknowledged" | "in_progress" | "blocked"
)
})
.count();
(
StatusCode::OK,
Json(InboxActionableCountResponse { actionable_count }),
)
.into_response()
}
Err(err) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
fn parse_inbox_status_state(value: &str) -> Option<InboxState> {
match value.trim().to_ascii_lowercase().as_str() {
"new" => Some(InboxState::New),
"ack" | "acknowledged" => Some(InboxState::Acknowledged),
"in_progress" | "in progress" | "started" => Some(InboxState::InProgress),
"blocked" => Some(InboxState::Blocked),
"done" | "completed" | "complete" => Some(InboxState::Done),
"dismissed" | "archived" => Some(InboxState::Dismissed),
_ => None,
}
}
fn inbox_state_to_str(state: InboxState) -> &'static str {
match state {
InboxState::New => "new",
InboxState::Acknowledged => "acknowledged",
InboxState::InProgress => "in_progress",
InboxState::Blocked => "blocked",
InboxState::Done => "done",
InboxState::Dismissed => "dismissed",
}
}
fn parse_inbox_action(value: &str) -> Option<InboxAction> {
match value.trim().to_ascii_lowercase().as_str() {
"ack" | "acknowledge" => Some(InboxAction::Acknowledge),
"start" => Some(InboxAction::Start),
"block" | "blocked" => Some(InboxAction::Block),
"done" | "complete" => Some(InboxAction::Done),
"reopen" | "undo" | "undone" => Some(InboxAction::Reopen),
"dismiss" | "dismissed" => Some(InboxAction::Dismiss),
"snooze" => Some(InboxAction::Snooze),
_ => None,
}
}
async fn inbox_transition(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<InboxTransitionRequest>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let Some(action) = parse_inbox_action(&payload.action) else {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Unsupported inbox action".to_string(),
}),
)
.into_response();
};
let items = match build_inbox_items(&state.db_path, &payload.user_id, 1000, true).await {
Ok(items) => items,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
let Some(item) = items
.iter()
.find(|item| item.origin_ref == payload.origin_ref)
.cloned()
else {
return (
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: "Inbox item not found".to_string(),
}),
)
.into_response();
};
let Some(previous_state) = parse_inbox_status_state(&item.status) else {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Invalid current inbox status".to_string(),
}),
)
.into_response();
};
let Some(next_state) = crate::inbox_fsm::transition(previous_state, action) else {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Invalid inbox transition".to_string(),
}),
)
.into_response();
};
if action == InboxAction::Done {
match item.source_type.as_str() {
"reminder" => {
let store = ReminderStore::new(&state.db_path).await;
match store {
Ok(store) => {
let _ = store
.complete_reminder(&payload.user_id, item.source_id)
.await;
}
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
}
}
"todo" => {
let store = TodoStore::new(&state.db_path).await;
match store {
Ok(store) => {
let _ = store.set_completed(item.source_id, true).await;
}
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
}
}
"task" => {
let store = TaskStore::new(&state.db_path).await;
match store {
Ok(store) => {
let _ = store.set_enabled(item.source_id, false).await;
}
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
}
}
_ => {}
}
}
if action == InboxAction::Reopen {
match item.source_type.as_str() {
"reminder" => {
let store = ReminderStore::new(&state.db_path).await;
match store {
Ok(store) => {
let _ = store
.reopen_reminder(&payload.user_id, item.source_id)
.await;
}
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
}
}
"todo" => {
let store = TodoStore::new(&state.db_path).await;
match store {
Ok(store) => {
let _ = store.set_completed(item.source_id, false).await;
}
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
}
}
"task" => {
let store = TaskStore::new(&state.db_path).await;
match store {
Ok(store) => {
let _ = store.set_enabled(item.source_id, true).await;
}
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
}
}
_ => {}
}
}
if action == InboxAction::Snooze && item.source_type == "reminder" {
let store = ReminderStore::new(&state.db_path).await;
match store {
Ok(store) => {
let now = now_ts();
let due_at = item.due_at.unwrap_or(now).max(now) + 15 * 60;
let _ = store
.snooze_reminder(&payload.user_id, item.source_id, due_at)
.await;
}
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
}
}
let state_store = match InboxStateStore::new(&state.db_path).await {
Ok(store) => store,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
};
if let Err(err) = state_store
.set_status(
&payload.user_id,
&payload.origin_ref,
inbox_state_to_str(next_state),
)
.await
{
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
let _ = state.ui_event_tx.send(UiEvent {
event_type: "inbox_transition".to_string(),
user_id: payload.user_id.clone(),
tool: "inbox".to_string(),
status: inbox_state_to_str(next_state).to_string(),
payload: json!({
"origin_ref": payload.origin_ref,
"source_type": item.source_type,
"source_id": item.source_id,
"action": payload.action,
"actor": "human",
"reason": "manual_transition",
"from": inbox_state_to_str(previous_state),
"to": inbox_state_to_str(next_state),
"previous_status": inbox_state_to_str(previous_state),
"next_status": inbox_state_to_str(next_state),
}),
timestamp: now_ts(),
});
(
StatusCode::OK,
Json(InboxTransitionResponse {
status: "ok".to_string(),
origin_ref: payload.origin_ref,
previous_status: inbox_state_to_str(previous_state).to_string(),
next_status: inbox_state_to_str(next_state).to_string(),
}),
)
.into_response()
}
async fn reminder_delivery_events(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Query(query): axum::extract::Query<ReminderDeliveryEventsQuery>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let config = Config::from_store(&state.db_path).ok();
let Some(path) = reminders_audit_log_path(config.as_ref()) else {
return (
StatusCode::OK,
Json(ReminderDeliveryEventsResponse { events: vec![] }),
)
.into_response();
};
let limit = query.limit.unwrap_or(50).clamp(1, 500);
let content = match tokio::fs::read_to_string(&path).await {
Ok(content) => content,
Err(_) => {
return (
StatusCode::OK,
Json(ReminderDeliveryEventsResponse { events: vec![] }),
)
.into_response()
}
};
let mut events = content
.lines()
.filter_map(|line| serde_json::from_str::<Value>(line).ok())
.filter(|event| {
if let Some(user_id) = &query.user_id {
event
.get("user_id")
.and_then(|value| value.as_str())
.map(|value| value == user_id)
.unwrap_or(false)
} else {
true
}
})
.collect::<Vec<_>>();
if events.len() > limit {
let keep_from = events.len() - limit;
events = events.split_off(keep_from);
}
(
StatusCode::OK,
Json(ReminderDeliveryEventsResponse { events }),
)
.into_response()
}
async fn audit_events(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Query(query): axum::extract::Query<AuditEventsQuery>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let config = Config::from_store(&state.db_path).ok();
let Some(path) = ui_event_log_path(config.as_ref()) else {
return (StatusCode::OK, Json(AuditEventsResponse { events: vec![] })).into_response();
};
let limit = query.limit.unwrap_or(200).clamp(1, 2000);
let content = match tokio::fs::read_to_string(&path).await {
Ok(content) => content,
Err(_) => {
return (StatusCode::OK, Json(AuditEventsResponse { events: vec![] })).into_response()
}
};
let mut events = content
.lines()
.filter_map(|line| serde_json::from_str::<Value>(line).ok())
.filter(|event| {
if let Some(user_id) = &query.user_id {
let event_user = event.get("user_id").and_then(|value| value.as_str());
let event_type = event.get("event_type").and_then(|value| value.as_str());
matches!(event_user, Some("system") | Some("daemon"))
|| event_user == Some(user_id.as_str())
|| matches!(event_type, Some("boot") | Some("autonomy"))
} else {
true
}
})
.collect::<Vec<_>>();
if events.len() > limit {
let keep_from = events.len() - limit;
events = events.split_off(keep_from);
}
(StatusCode::OK, Json(AuditEventsResponse { events })).into_response()
}
fn parse_plan_step_title(step: &Value) -> Option<String> {
if let Some(text) = step.as_str() {
let trimmed = text.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}
for key in ["title", "description", "name", "text", "step"] {
if let Some(value) = step.get(key).and_then(|v| v.as_str()) {
let trimmed = value.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}
}
None
}
fn normalize_unix_timestamp(value: i64) -> Option<i64> {
if value <= 0 {
return None;
}
if value >= 1_000_000_000_000 {
return Some(value / 1000);
}
Some(value)
}
fn parse_yyyy_mm_dd_to_unix(value: &str) -> Option<i64> {
let trimmed = value.trim();
let normalized = trimmed.replace('/', "-");
let mut parts = normalized.split('-');
let year = parts.next()?.trim().parse::<i32>().ok()?;
let month = parts.next()?.trim().parse::<u8>().ok()?;
let day = parts.next()?.trim().parse::<u8>().ok()?;
if parts.next().is_some() {
return None;
}
let date = Date::from_calendar_date(year, time::Month::try_from(month).ok()?, day).ok()?;
let local_offset = UtcOffset::current_local_offset().unwrap_or(UtcOffset::UTC);
Some(
PrimitiveDateTime::new(date, Time::MIDNIGHT)
.assume_offset(local_offset)
.unix_timestamp(),
)
}
fn parse_due_at_value(value: &Value) -> Option<i64> {
match value {
Value::Number(number) => number
.as_i64()
.and_then(normalize_unix_timestamp)
.or_else(|| {
number
.as_u64()
.and_then(|n| normalize_unix_timestamp(n as i64))
}),
Value::String(raw) => {
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
if let Ok(ts) = trimmed.parse::<i64>() {
return normalize_unix_timestamp(ts);
}
if let Some(ts) = parse_yyyy_mm_dd_to_unix(trimmed) {
return Some(normalize_due_year_if_stale(ts, trimmed));
}
if let Some(ts) = parse_due_at_from_text(trimmed, now_ts()) {
return Some(ts);
}
static DATE_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let date_re = DATE_RE.get_or_init(|| Regex::new(r"(\d{4}[-/]\d{2}[-/]\d{2})").unwrap());
let captures = date_re.captures(trimmed)?;
parse_yyyy_mm_dd_to_unix(captures.get(1)?.as_str())
}
_ => None,
}
}
fn normalize_nlp_due_text(input: &str) -> String {
static FROM_START_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
static FROM_NOW_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let from_start_re = FROM_START_RE.get_or_init(|| {
Regex::new(r"(?i)\b(\d+)\s*(day|days|week|weeks|month|months)\s+from\s+start\b").unwrap()
});
let from_now_re = FROM_NOW_RE.get_or_init(|| {
Regex::new(r"(?i)\b(\d+)\s*(day|days|week|weeks|month|months)\s+from\s+now\b").unwrap()
});
let normalized = from_start_re.replace_all(input, "in $1 $2").to_string();
from_now_re.replace_all(&normalized, "in $1 $2").to_string()
}
fn text_has_explicit_time(input: &str) -> bool {
static TIME_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let re = TIME_RE.get_or_init(|| {
Regex::new(r"(?i)(\b\d{1,2}:\d{2}\b|\b\d{1,2}\s*(am|pm)\b|\bnoon\b|\bmidnight\b)").unwrap()
});
re.is_match(input)
}
fn local_midnight_unix(ts: i64) -> i64 {
let Ok(utc_dt) = time::OffsetDateTime::from_unix_timestamp(ts) else {
return ts;
};
let local_offset = UtcOffset::current_local_offset().unwrap_or(UtcOffset::UTC);
let local_dt = utc_dt.to_offset(local_offset);
let date = local_dt.date();
PrimitiveDateTime::new(date, Time::MIDNIGHT)
.assume_offset(local_offset)
.unix_timestamp()
}
fn extract_explicit_year(input: &str) -> Option<i32> {
static YEAR_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let re = YEAR_RE.get_or_init(|| Regex::new(r"\b(20\d{2})\b").unwrap());
re.captures(input)
.and_then(|caps| caps.get(1))
.and_then(|m| m.as_str().parse::<i32>().ok())
}
fn with_local_year(ts: i64, year: i32) -> Option<i64> {
let utc_dt = time::OffsetDateTime::from_unix_timestamp(ts).ok()?;
let local_offset = UtcOffset::current_local_offset().unwrap_or(UtcOffset::UTC);
let local_dt = utc_dt.to_offset(local_offset);
let month = local_dt.month();
let mut day = local_dt.day();
let date = loop {
if let Ok(value) = Date::from_calendar_date(year, month, day) {
break value;
}
if day == 1 {
return None;
}
day -= 1;
};
let pdt = PrimitiveDateTime::new(
date,
Time::from_hms(local_dt.hour(), local_dt.minute(), local_dt.second()).ok()?,
);
Some(pdt.assume_offset(local_offset).unix_timestamp())
}
fn normalize_due_year_if_stale(ts: i64, input: &str) -> i64 {
let now = now_ts();
let Some(explicit_year) = extract_explicit_year(input) else {
return ts;
};
let Ok(now_local) = time::OffsetDateTime::from_unix_timestamp(now) else {
return ts;
};
let local_offset = UtcOffset::current_local_offset().unwrap_or(UtcOffset::UTC);
let current_year = now_local.to_offset(local_offset).year();
if explicit_year >= current_year {
return ts;
}
if let Some(mut adjusted) = with_local_year(ts, current_year) {
if adjusted < now {
if let Some(next_year) = with_local_year(ts, current_year + 1) {
adjusted = next_year;
}
}
return adjusted;
}
ts
}
fn parse_due_at_nlp(input: &str, anchor_ts: i64) -> Option<i64> {
let normalized = normalize_nlp_due_text(input);
let anchor = Local
.timestamp_opt(anchor_ts, 0)
.single()
.or_else(|| Local.timestamp_opt(now_ts(), 0).single())
.unwrap_or_else(Local::now);
parse_date_string(&normalized, anchor, Dialect::Us)
.or_else(|_| parse_date_string(&normalized, anchor, Dialect::Uk))
.ok()
.map(|dt: DateTime<Local>| {
let ts = dt.timestamp();
let parsed = if text_has_explicit_time(input) {
ts
} else {
local_midnight_unix(ts)
};
normalize_due_year_if_stale(parsed, input)
})
}
fn parse_due_at_from_text(input: &str, anchor_ts: i64) -> Option<i64> {
if input.trim().is_empty() {
return None;
}
static DATE_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let date_re = DATE_RE.get_or_init(|| {
Regex::new(r"(?i)(?:due(?:\s+date)?|deadline)\s*:\s*(\d{4}[-/]\d{2}[-/]\d{2})").unwrap()
});
if let Some(captures) = date_re.captures(input) {
if let Some(found) = captures.get(1).map(|m| m.as_str()) {
if let Some(ts) = parse_yyyy_mm_dd_to_unix(found) {
return Some(ts);
}
}
}
let fallback_re = DATE_RE.get_or_init(|| Regex::new(r"(\d{4}[-/]\d{2}[-/]\d{2})").unwrap());
if let Some(captures) = fallback_re.captures(input) {
if let Some(ts) = parse_yyyy_mm_dd_to_unix(captures.get(1)?.as_str()) {
return Some(normalize_due_year_if_stale(ts, input));
}
}
parse_due_at_nlp(input, anchor_ts)
}
fn parse_plan_step_due_at(step: &Value, title: &str, anchor_ts: i64) -> Option<i64> {
for key in [
"due_at",
"due_ts",
"due",
"due_date",
"due_on",
"deadline",
"target_date",
] {
if let Some(value) = step.get(key).and_then(parse_due_at_value) {
return Some(value);
}
}
if let Some(description) = step.get("description").and_then(|v| v.as_str()) {
if let Some(ts) = parse_due_at_from_text(description, anchor_ts) {
return Some(ts);
}
}
parse_due_at_from_text(title, anchor_ts)
}
fn parse_plan_step_story_points(step: &Value, text: &str) -> Option<i32> {
for key in ["story_points", "points", "estimate_points"] {
if let Some(value) = step.get(key).and_then(|v| v.as_i64()) {
if value > 0 {
return Some(value as i32);
}
}
}
static STORY_POINTS_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let re =
STORY_POINTS_RE.get_or_init(|| Regex::new(r"(?i)story\s*points?\s*:\s*(\d+)").unwrap());
re.captures(text)
.and_then(|caps| caps.get(1))
.and_then(|m| m.as_str().parse::<i32>().ok())
.filter(|value| *value > 0)
}
fn parse_plan_step_t_shirt_size(step: &Value, text: &str) -> Option<String> {
for key in ["t_shirt_size", "tshirt_size", "shirt_size", "size"] {
if let Some(value) = step.get(key).and_then(|v| v.as_str()) {
let normalized = value.trim().to_ascii_uppercase();
if ["XS", "S", "M", "L", "XL", "XXL"].contains(&normalized.as_str()) {
return Some(normalized);
}
}
}
static TSHIRT_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let re = TSHIRT_RE
.get_or_init(|| Regex::new(r"(?i)t-?shirt\s*size\s*:\s*(XS|S|M|L|XL|XXL)").unwrap());
re.captures(text)
.and_then(|caps| caps.get(1))
.map(|m| m.as_str().to_ascii_uppercase())
}
fn parse_time_estimate_minutes_from_text(text: &str) -> Option<i32> {
static WEEK_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
static DAY_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
static HOUR_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
static MIN_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let week_re =
WEEK_RE.get_or_init(|| Regex::new(r"(?i)time\s*estimate\s*:\s*(\d+)\s*weeks?").unwrap());
if let Some(weeks) = week_re
.captures(text)
.and_then(|caps| caps.get(1))
.and_then(|m| m.as_str().parse::<i32>().ok())
{
return Some(weeks.max(1) * 5 * 8 * 60);
}
let day_re =
DAY_RE.get_or_init(|| Regex::new(r"(?i)time\s*estimate\s*:\s*(\d+)\s*days?").unwrap());
if let Some(days) = day_re
.captures(text)
.and_then(|caps| caps.get(1))
.and_then(|m| m.as_str().parse::<i32>().ok())
{
return Some(days.max(1) * 8 * 60);
}
let hour_re =
HOUR_RE.get_or_init(|| Regex::new(r"(?i)time\s*estimate\s*:\s*(\d+)\s*hours?").unwrap());
if let Some(hours) = hour_re
.captures(text)
.and_then(|caps| caps.get(1))
.and_then(|m| m.as_str().parse::<i32>().ok())
{
return Some(hours.max(1) * 60);
}
let min_re =
MIN_RE.get_or_init(|| Regex::new(r"(?i)time\s*estimate\s*:\s*(\d+)\s*minutes?").unwrap());
min_re
.captures(text)
.and_then(|caps| caps.get(1))
.and_then(|m| m.as_str().parse::<i32>().ok())
.map(|minutes| minutes.max(1))
}
fn parse_plan_status(value: Option<&str>, default_status: &str) -> String {
match value.unwrap_or("").trim().to_ascii_lowercase().as_str() {
"new" => "new".to_string(),
"ack" | "acknowledged" => "acknowledged".to_string(),
"in_progress" | "in progress" | "started" => "in_progress".to_string(),
"blocked" => "blocked".to_string(),
"done" | "completed" | "complete" => "done".to_string(),
"dismissed" | "archived" => "dismissed".to_string(),
_ => default_status.to_string(),
}
}
fn parse_plan_priority(value: Option<&str>) -> String {
match value.unwrap_or("").trim().to_ascii_lowercase().as_str() {
"low" => "low".to_string(),
"high" => "high".to_string(),
"urgent" | "critical" => "urgent".to_string(),
_ => "normal".to_string(),
}
}
fn parse_plan_priority_from_text(text: &str) -> Option<String> {
static PRIORITY_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let re = PRIORITY_RE.get_or_init(|| {
Regex::new(r"(?i)priority\s*:\s*(low|normal|medium|high|urgent|critical)").unwrap()
});
let raw = re
.captures(text)
.and_then(|caps| caps.get(1))
.map(|m| m.as_str())?;
Some(parse_plan_priority(Some(raw)))
}
fn parse_plan_owner_from_text(text: &str) -> Option<String> {
static OWNER_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let re = OWNER_RE.get_or_init(|| Regex::new(r"(?i)owner\s*:\s*(human|agent)").unwrap());
re.captures(text)
.and_then(|caps| caps.get(1))
.map(|m| m.as_str().to_ascii_lowercase())
}
fn build_plan_step_alias_map(plan_id: i32, steps: &[Value]) -> HashMap<String, String> {
let mut alias_map = HashMap::new();
for (index, step) in steps.iter().enumerate() {
let origin_ref = format!("plan_step:{plan_id}:{index}");
alias_map.insert(index.to_string(), origin_ref.clone());
alias_map.insert(format!("step {index}"), origin_ref.clone());
alias_map.insert(format!("step {}", index + 1), origin_ref.clone());
for key in ["id", "ref", "key", "code", "step_id"] {
if let Some(value) = step.get(key).and_then(|v| v.as_str()) {
let normalized = value.trim().to_ascii_lowercase();
if !normalized.is_empty() {
alias_map.insert(normalized, origin_ref.clone());
}
}
}
if let Some(title) = parse_plan_step_title(step) {
let normalized = title.trim().to_ascii_lowercase();
if !normalized.is_empty() {
alias_map.insert(normalized.clone(), origin_ref.clone());
let compact = normalized
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c.is_ascii_whitespace() {
c
} else {
' '
}
})
.collect::<String>()
.split_whitespace()
.collect::<Vec<_>>()
.join(" ");
if !compact.is_empty() {
alias_map.insert(compact, origin_ref.clone());
}
}
}
}
alias_map
}
fn parse_plan_step_dependency_refs(
plan_id: i32,
step: &Value,
alias_map: &HashMap<String, String>,
) -> Vec<String> {
fn resolve_alias(alias_map: &HashMap<String, String>, lower: &str) -> Option<String> {
if let Some(mapped) = alias_map.get(lower) {
return Some(mapped.clone());
}
if lower.len() < 4 {
return None;
}
alias_map
.iter()
.filter(|(key, _)| key.contains(lower) || lower.contains(key.as_str()))
.max_by_key(|(key, _)| key.len())
.map(|(_, value)| value.clone())
}
fn push_dependency_ref(
out: &mut Vec<String>,
plan_id: i32,
alias_map: &HashMap<String, String>,
value: &Value,
) {
match value {
Value::Number(number) => {
if let Some(index) = number.as_u64() {
out.push(format!("plan_step:{plan_id}:{index}"));
}
}
Value::String(raw) => {
let trimmed = raw.trim();
if trimmed.is_empty() {
return;
}
let lower = trimmed.to_ascii_lowercase();
if lower.starts_with("plan_step:")
|| lower.starts_with("task:")
|| lower.starts_with("todo:")
|| lower.starts_with("reminder:")
|| lower.starts_with("plan:")
{
out.push(lower);
} else if let Some(mapped) = resolve_alias(alias_map, &lower) {
out.push(mapped.clone());
} else if trimmed.contains(',') || trimmed.contains('|') || trimmed.contains(';') {
for token in trimmed.split([',', '|', ';']) {
push_dependency_ref(
out,
plan_id,
alias_map,
&Value::String(token.to_string()),
);
}
} else if let Ok(index) = trimmed.parse::<u64>() {
out.push(format!("plan_step:{plan_id}:{index}"));
} else if let Some(rest) = lower.strip_prefix("step ") {
if let Ok(index) = rest.trim().parse::<u64>() {
out.push(format!("plan_step:{plan_id}:{index}"));
}
} else {
out.push(lower);
}
}
Value::Array(values) => {
for entry in values {
push_dependency_ref(out, plan_id, alias_map, entry);
}
}
Value::Object(map) => {
if let Some(origin_ref) = map.get("origin_ref").or_else(|| map.get("ref")) {
push_dependency_ref(out, plan_id, alias_map, origin_ref);
} else if let Some(id) = map.get("id") {
push_dependency_ref(out, plan_id, alias_map, id);
} else if let Some(step_index) = map
.get("step_index")
.or_else(|| map.get("index"))
.or_else(|| map.get("step"))
{
push_dependency_ref(out, plan_id, alias_map, step_index);
}
}
_ => {}
}
}
fn push_dependency_refs_from_text(
out: &mut Vec<String>,
plan_id: i32,
alias_map: &HashMap<String, String>,
text: &str,
) {
static DEPENDS_ON_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let re = DEPENDS_ON_RE.get_or_init(|| {
Regex::new(
r"(?i)(?:depends\s*on|dependencies|blocked\s*by|requires|dependency[_\s-]*refs?)\s*:\s*([^\n\r]+)",
)
.unwrap()
});
for caps in re.captures_iter(text) {
if let Some(raw) = caps.get(1).map(|m| m.as_str()) {
for token in raw.split([',', '|', ';']) {
push_dependency_ref(out, plan_id, alias_map, &Value::String(token.to_string()));
}
}
}
}
let mut refs = Vec::new();
for key in [
"dependency_refs",
"dependencyRefs",
"depends_on",
"dependsOn",
"dependencies",
"blocked_by",
"blockedBy",
"requires",
"prerequisite",
"prerequisites",
"after",
] {
if let Some(value) = step.get(key) {
push_dependency_ref(&mut refs, plan_id, alias_map, value);
}
}
if let Some(title) = parse_plan_step_title(step) {
push_dependency_refs_from_text(&mut refs, plan_id, alias_map, &title);
}
if let Some(description) = step.get("description").and_then(|v| v.as_str()) {
push_dependency_refs_from_text(&mut refs, plan_id, alias_map, description);
}
refs.sort();
refs.dedup();
refs
}
fn extract_plan_step_ref_from_details(details: Option<&str>) -> Option<String> {
static PLAN_STEP_REF_RE: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let text = details?;
let re = PLAN_STEP_REF_RE
.get_or_init(|| Regex::new(r"(?i)planstepref\s*:\s*(plan_step:\d+:\d+)").unwrap());
re.captures(text)
.and_then(|caps| caps.get(1))
.map(|m| m.as_str().to_ascii_lowercase())
}
fn priority_rank(priority: &str) -> i32 {
match priority {
"urgent" => 0,
"high" => 1,
"normal" => 2,
_ => 3,
}
}
async fn build_inbox_items(
db_path: &str,
user_id: &str,
limit: usize,
include_done: bool,
) -> Result<Vec<InboxItemResponse>> {
let now = now_ts();
let config_json = Config::from_store(db_path)
.ok()
.and_then(|cfg| cfg.tools)
.unwrap_or(Value::Null);
let reminder_db_path =
resolve_reminder_db_path(&config_json).unwrap_or_else(|| db_path.to_string());
let todo_db_path = resolve_todo_db_path(&config_json).unwrap_or_else(|| db_path.to_string());
let task_db_path = resolve_task_db_path(&config_json).unwrap_or_else(|| db_path.to_string());
let plan_db_path = resolve_plan_db_path(&config_json).unwrap_or_else(|| db_path.to_string());
let reminder_store = ReminderStore::new(&reminder_db_path).await?;
let todo_store = TodoStore::new(&todo_db_path).await?;
let task_store = TaskStore::new(&task_db_path).await?;
let plan_store = PlanStore::new(&plan_db_path).await?;
let reminders = reminder_store
.list_reminders(user_id, crate::reminders::ReminderStatus::All, limit)
.await?;
let todos = todo_store
.list_items(user_id, crate::todo::TodoStatus::All, limit)
.await?;
let tasks = task_store
.list_tasks(user_id, crate::tasks::TaskStatus::All, limit)
.await?;
let plans = plan_store.list_plans(user_id, limit).await?;
let plan_ids: Vec<i32> = plans.iter().map(|plan| plan.id).collect();
let plan_dependency_refs = plan_store
.list_step_dependencies_for_plans(&plan_ids)
.await?;
let status_store = InboxStateStore::new(db_path).await?;
let persisted_statuses = status_store.list_statuses(user_id, 2000).await?;
let mut items = Vec::new();
for reminder in reminders {
let status = if reminder.completed_at.is_some() {
"done"
} else {
"new"
};
if !include_done && status == "done" {
continue;
}
let priority = if reminder.completed_at.is_none() && reminder.due_at <= now {
"high"
} else {
"normal"
};
items.push(InboxItemResponse {
id: format!("reminder:{}", reminder.id),
source_type: "reminder".to_string(),
source_id: reminder.id,
title: reminder.title,
details: Some("Reminder".to_string()),
owner: "human".to_string(),
status: status.to_string(),
priority: priority.to_string(),
due_at: Some(reminder.due_at),
created_at: reminder.created_at,
updated_at: reminder.completed_at.unwrap_or(reminder.created_at),
requires_human_action: reminder.completed_at.is_none(),
origin_ref: format!("reminder:{}", reminder.id),
dependency_refs: vec![],
t_shirt_size: None,
story_points: None,
estimate_optimistic_minutes: None,
estimate_likely_minutes: None,
estimate_pessimistic_minutes: None,
});
}
for todo in todos {
let status = if todo.completed_at.is_some() {
"done"
} else {
"new"
};
if !include_done && status == "done" {
continue;
}
items.push(InboxItemResponse {
id: format!("todo:{}", todo.id),
source_type: "todo".to_string(),
source_id: todo.id,
title: todo.title,
details: todo.notes,
owner: "human".to_string(),
status: status.to_string(),
priority: "normal".to_string(),
due_at: None,
created_at: todo.created_at,
updated_at: todo.updated_at,
requires_human_action: todo.completed_at.is_none(),
origin_ref: format!("todo:{}", todo.id),
dependency_refs: todo.dependency_refs,
t_shirt_size: todo.t_shirt_size,
story_points: todo.story_points,
estimate_optimistic_minutes: todo.estimate_optimistic_minutes,
estimate_likely_minutes: todo.estimate_likely_minutes,
estimate_pessimistic_minutes: todo.estimate_pessimistic_minutes,
});
}
for task in tasks {
let status = if task.enabled { "new" } else { "done" };
if !include_done && status == "done" {
continue;
}
let priority = if task.enabled && task.next_run_at <= now {
"high"
} else {
"normal"
};
items.push(InboxItemResponse {
id: format!("task:{}", task.id),
source_type: "task".to_string(),
source_id: task.id,
title: task.name,
details: Some(task.prompt),
owner: "human".to_string(),
status: status.to_string(),
priority: priority.to_string(),
due_at: Some(task.next_run_at),
created_at: task.created_at,
updated_at: task.updated_at,
requires_human_action: task.enabled,
origin_ref: format!("task:{}", task.id),
dependency_refs: vec![],
t_shirt_size: None,
story_points: None,
estimate_optimistic_minutes: None,
estimate_likely_minutes: None,
estimate_pessimistic_minutes: None,
});
}
for plan in plans {
let mut emitted_step = false;
if let Some(steps) = plan.steps.as_ref().and_then(|value| value.as_array()) {
let step_alias_map = build_plan_step_alias_map(plan.id, steps);
for (index, step) in steps.iter().enumerate() {
let Some(title) = parse_plan_step_title(step) else {
continue;
};
emitted_step = true;
let owner = step
.get("owner")
.and_then(|v| v.as_str())
.unwrap_or("human")
.to_ascii_lowercase();
let owner = parse_plan_owner_from_text(&title).unwrap_or(owner);
let status = parse_plan_status(
step.get("status").and_then(|v| v.as_str()),
if plan.status == "done" { "done" } else { "new" },
);
if !include_done && status == "done" {
continue;
}
let due_at = parse_plan_step_due_at(step, &title, plan.created_at);
let priority = parse_plan_priority(step.get("priority").and_then(|v| v.as_str()));
let priority = parse_plan_priority_from_text(&title).unwrap_or(priority);
let origin_ref = format!("plan_step:{}:{}", plan.id, index);
let dependency_refs = plan_dependency_refs
.get(&origin_ref)
.cloned()
.unwrap_or_else(|| {
parse_plan_step_dependency_refs(plan.id, step, &step_alias_map)
});
let t_shirt_size = parse_plan_step_t_shirt_size(step, &title);
let story_points = parse_plan_step_story_points(step, &title);
let estimate_likely_minutes = step
.get("estimate_likely_minutes")
.and_then(|v| v.as_i64())
.map(|v| v as i32)
.or_else(|| parse_time_estimate_minutes_from_text(&title));
let estimate_optimistic_minutes = step
.get("estimate_optimistic_minutes")
.and_then(|v| v.as_i64())
.map(|v| v as i32)
.or_else(|| {
estimate_likely_minutes.map(|value| ((value as f32) * 0.70).round() as i32)
});
let estimate_pessimistic_minutes = step
.get("estimate_pessimistic_minutes")
.and_then(|v| v.as_i64())
.map(|v| v as i32)
.or_else(|| {
estimate_likely_minutes.map(|value| ((value as f32) * 1.45).round() as i32)
});
items.push(InboxItemResponse {
id: format!("plan_step:{}:{}", plan.id, index),
source_type: "plan_step".to_string(),
source_id: plan.id,
title,
details: Some(format!("Plan: {}", plan.title)),
owner: owner.clone(),
status,
priority,
due_at,
created_at: plan.created_at,
updated_at: plan.updated_at,
requires_human_action: owner != "agent",
origin_ref,
dependency_refs,
t_shirt_size,
story_points,
estimate_optimistic_minutes,
estimate_likely_minutes,
estimate_pessimistic_minutes,
});
}
}
if !emitted_step {
let status = if plan.status == "done" { "done" } else { "new" };
if !include_done && status == "done" {
continue;
}
items.push(InboxItemResponse {
id: format!("plan:{}", plan.id),
source_type: "plan_step".to_string(),
source_id: plan.id,
title: format!("Review plan: {}", plan.title),
details: Some(plan.goal),
owner: "human".to_string(),
status: status.to_string(),
priority: "normal".to_string(),
due_at: None,
created_at: plan.created_at,
updated_at: plan.updated_at,
requires_human_action: true,
origin_ref: format!("plan:{}", plan.id),
dependency_refs: vec![],
t_shirt_size: None,
story_points: None,
estimate_optimistic_minutes: None,
estimate_likely_minutes: None,
estimate_pessimistic_minutes: None,
});
}
}
for item in &mut items {
if let Some(status) = persisted_statuses.get(&item.origin_ref) {
item.status = status.clone();
}
}
let plan_step_refs = items
.iter()
.filter(|item| item.source_type == "plan_step")
.map(|item| item.origin_ref.to_ascii_lowercase())
.collect::<HashSet<_>>();
items.retain(|item| {
!(item.source_type == "todo"
&& extract_plan_step_ref_from_details(item.details.as_deref())
.map(|origin_ref| plan_step_refs.contains(&origin_ref))
.unwrap_or(false))
});
let mut seen_origin_refs = HashSet::new();
items.retain(|item| seen_origin_refs.insert(item.origin_ref.clone()));
if !include_done {
items.retain(|item| item.status != "done" && item.status != "dismissed");
}
items.sort_by(|a, b| {
priority_rank(&a.priority)
.cmp(&priority_rank(&b.priority))
.then_with(|| {
a.due_at
.unwrap_or(i64::MAX)
.cmp(&b.due_at.unwrap_or(i64::MAX))
})
.then_with(|| b.created_at.cmp(&a.created_at))
});
if items.len() > limit {
items.truncate(limit);
}
Ok(items)
}
async fn solana_wallet(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Query(query): axum::extract::Query<SolanaWalletQuery>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let actor = query.actor.unwrap_or_else(|| "agent".to_string());
match crate::security::solana_signer::wallet_address(&query.user_id, &actor) {
Ok(address) => (
StatusCode::OK,
Json(SolanaWalletResponse {
user_id: query.user_id,
actor,
address,
}),
)
.into_response(),
Err(err) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn solana_balance(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Query(query): axum::extract::Query<SolanaBalanceQuery>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let policy = match load_solana_rpc_policy(&state) {
Ok(policy) => policy,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
let endpoint = match require_solana_rpc_endpoint(&policy) {
Ok(endpoint) => endpoint,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
let address =
match resolve_query_or_wallet_address(query.address, query.user_id, query.actor, "agent") {
Ok(address) => address,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
match crate::solana_rpc::get_balance(&endpoint, &address, &policy.commitment).await {
Ok(lamports) => (
StatusCode::OK,
Json(SolanaBalanceResponse {
address,
lamports,
sol: lamports as f64 / 1_000_000_000f64,
}),
)
.into_response(),
Err(err) => (
StatusCode::BAD_GATEWAY,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn solana_transfer(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<SolanaTransferRequest>,
) -> impl IntoResponse {
execute_solana_transfer(state, headers, payload, false).await
}
async fn solana_simulate_transfer(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<SolanaTransferRequest>,
) -> impl IntoResponse {
execute_solana_transfer(state, headers, payload, true).await
}
async fn execute_solana_transfer(
state: AppState,
headers: HeaderMap,
payload: SolanaTransferRequest,
force_simulation_only: bool,
) -> Response {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let actor = payload.actor.unwrap_or_else(|| "agent".to_string());
let signer_preview = match state.signer_service.process(SignerRequest::Preview {
intent: Box::new(SigningIntent {
request_id: payload.request_id.clone(),
actor: actor.clone(),
user_id: payload.user_id.clone(),
action_type: "solana_transfer".to_string(),
amount_atomic: payload.lamports,
payee: payload
.payee
.clone()
.unwrap_or_else(|| "merchant.local".to_string()),
context_requires_approval: false,
scheme_id: None,
chain_id: Some("solana:5eykt4UsFv8P8NJdTREpY1vzqKqZKvdp".to_string()),
payment_authority: None,
idempotency_key: Some(payload.request_id.clone()),
}),
}) {
Ok(response) => response,
Err(err) => {
return (
StatusCode::FORBIDDEN,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
if signer_preview.status != "approved" {
return (
StatusCode::FORBIDDEN,
Json(json!({
"error": "transfer requires approval",
"signer": signer_preview,
})),
)
.into_response();
}
let policy = match load_solana_rpc_policy(&state) {
Ok(policy) => policy,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
let endpoint = match require_solana_rpc_endpoint(&policy) {
Ok(endpoint) => endpoint,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
let from_seed = match crate::security::solana_signer::signing_seed(&payload.user_id, &actor) {
Ok(seed) => seed,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
let latest_blockhash =
match crate::solana_rpc::get_latest_blockhash(&endpoint, &policy.commitment).await {
Ok(hash) => hash,
Err(err) => {
return (
StatusCode::BAD_GATEWAY,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
let probe_unit_limit = crate::solana_rpc::probe_compute_unit_limit(&policy);
let (probe_tx_base64, wallet_address) =
match crate::solana_rpc::build_transfer_transaction_base64_with_unit_limit(
&from_seed,
&payload.to,
payload.lamports,
&latest_blockhash,
&policy,
probe_unit_limit,
) {
Ok(tuple) => tuple,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
let probe_simulation =
match crate::solana_rpc::simulate_transaction(&endpoint, &probe_tx_base64, &policy).await {
Ok(value) => value,
Err(err) => {
return (
StatusCode::BAD_GATEWAY,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
let adjusted_unit_limit = crate::solana_rpc::recommended_compute_unit_limit(
&probe_simulation,
policy.compute_budget.unit_limit,
);
let tx_base64 = if adjusted_unit_limit == probe_unit_limit {
probe_tx_base64
} else {
match crate::solana_rpc::build_transfer_transaction_base64_with_unit_limit(
&from_seed,
&payload.to,
payload.lamports,
&latest_blockhash,
&policy,
adjusted_unit_limit,
) {
Ok((tx, _)) => tx,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
}
};
let simulate_only = force_simulation_only || payload.simulate_only.unwrap_or(false);
let simulation_result = Some(probe_simulation);
if simulate_only {
return (
StatusCode::OK,
Json(SolanaTransferResponse {
status: "simulated".to_string(),
request_id: payload.request_id,
wallet_address,
signer_reason_code: signer_preview.reason_code,
simulation: simulation_result,
signature: None,
}),
)
.into_response();
}
if let Err(err) = state.signer_service.process(SignerRequest::Sign {
request_id: payload.request_id.clone(),
}) {
return (
StatusCode::FORBIDDEN,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
match crate::solana_rpc::send_transaction(&endpoint, &tx_base64, &policy).await {
Ok(signature) => (
StatusCode::OK,
Json(SolanaTransferResponse {
status: "submitted".to_string(),
request_id: payload.request_id,
wallet_address,
signer_reason_code: signer_preview.reason_code,
simulation: simulation_result,
signature: Some(signature),
}),
)
.into_response(),
Err(err) => (
StatusCode::BAD_GATEWAY,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn solana_tx_status(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Query(query): axum::extract::Query<SolanaTxStatusQuery>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let policy = match load_solana_rpc_policy(&state) {
Ok(policy) => policy,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
let endpoint = match require_solana_rpc_endpoint(&policy) {
Ok(endpoint) => endpoint,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
match crate::solana_rpc::get_signature_status(&endpoint, &query.signature).await {
Ok(value) => (
StatusCode::OK,
Json(SolanaTxStatusResponse {
signature: query.signature,
value,
}),
)
.into_response(),
Err(err) => (
StatusCode::BAD_GATEWAY,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn solana_tx_history(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Query(query): axum::extract::Query<SolanaTxHistoryQuery>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let policy = match load_solana_rpc_policy(&state) {
Ok(policy) => policy,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
let endpoint = match require_solana_rpc_endpoint(&policy) {
Ok(endpoint) => endpoint,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
let address =
match resolve_query_or_wallet_address(query.address, query.user_id, query.actor, "agent") {
Ok(address) => address,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
};
match crate::solana_rpc::get_signatures_for_address(
&endpoint,
&address,
query.limit.unwrap_or(20),
)
.await
{
Ok(entries) => (
StatusCode::OK,
Json(SolanaTxHistoryResponse { address, entries }),
)
.into_response(),
Err(err) => (
StatusCode::BAD_GATEWAY,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn x402_preview(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<X402PreviewRequest>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let context_requires_approval = payload.context_requires_approval.unwrap_or(false);
let (canonical_intent, signing_intent) = match canonicalize_payment_required(
&payload.request_id,
&payload.actor,
&payload.user_id,
&payload.payment_required,
payload.merchant_origin.as_deref(),
context_requires_approval,
payload.idempotency_key.as_deref(),
) {
Ok(value) => value,
Err(err) => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
};
match state.signer_service.process(SignerRequest::Preview {
intent: Box::new(signing_intent),
}) {
Ok(signer_response) => (
StatusCode::OK,
Json(X402PreviewResponse {
canonical_intent,
signer: signer_response,
}),
)
.into_response(),
Err(err) => (
StatusCode::FORBIDDEN,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn signer_preview(
State(state): State<AppState>,
headers: HeaderMap,
Json(intent): Json<SigningIntent>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
match state.signer_service.process(SignerRequest::Preview {
intent: Box::new(intent),
}) {
Ok(response) => (StatusCode::OK, Json(response)).into_response(),
Err(err) => (
StatusCode::FORBIDDEN,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn signer_approve(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<SignerIdRequest>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
match state.signer_service.process(SignerRequest::Approve {
request_id: payload.request_id,
}) {
Ok(response) => (StatusCode::OK, Json(response)).into_response(),
Err(err) => (
StatusCode::FORBIDDEN,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn signer_sign(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<SignerIdRequest>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
match state.signer_service.process(SignerRequest::Sign {
request_id: payload.request_id,
}) {
Ok(response) => (StatusCode::OK, Json(response)).into_response(),
Err(err) => (
StatusCode::FORBIDDEN,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn signer_deny(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<SignerIdRequest>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
match state.signer_service.process(SignerRequest::Deny {
request_id: payload.request_id,
}) {
Ok(response) => (StatusCode::OK, Json(response)).into_response(),
Err(err) => (
StatusCode::FORBIDDEN,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn health() -> Json<HealthResponse> {
Json(HealthResponse {
status: "ok".to_string(),
})
}
async fn doctor(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let checks = run_doctor_checks(&state).await;
let has_fail = checks.iter().any(|check| check.status == "fail");
let has_warn = checks.iter().any(|check| check.status == "warn");
let overall = if has_fail {
"fail"
} else if has_warn {
"warn"
} else {
"pass"
};
(
StatusCode::OK,
Json(DoctorResponse {
overall: overall.to_string(),
checks,
}),
)
.into_response()
}
async fn security_audit(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let findings = run_security_audit_checks(&state).await;
let overall = highest_severity(&findings);
(
StatusCode::OK,
Json(SecurityAuditResponse { overall, findings }),
)
.into_response()
}
fn doctor_check(name: &str, status: &str, message: String, fix_hint: Option<&str>) -> DoctorCheck {
DoctorCheck {
name: name.to_string(),
status: status.to_string(),
message,
fix_hint: fix_hint.map(str::to_string),
}
}
fn security_finding(
id: &str,
severity: &str,
status: &str,
message: String,
fix_hint: Option<&str>,
auto_fixable: bool,
) -> SecurityAuditFinding {
SecurityAuditFinding {
id: id.to_string(),
severity: severity.to_string(),
status: status.to_string(),
message,
fix_hint: fix_hint.map(str::to_string),
auto_fixable,
}
}
fn severity_rank(severity: &str) -> u8 {
match severity {
"critical" => 4,
"high" => 3,
"medium" => 2,
_ => 1,
}
}
fn highest_severity(findings: &[SecurityAuditFinding]) -> String {
findings
.iter()
.filter(|finding| finding.status != "pass")
.max_by_key(|finding| severity_rank(&finding.severity))
.map(|finding| finding.severity.clone())
.unwrap_or_else(|| "low".to_string())
}
async fn run_security_audit_checks(state: &AppState) -> Vec<SecurityAuditFinding> {
let mut findings = Vec::new();
if state.token.trim().is_empty() {
findings.push(security_finding(
"daemon_auth_token",
"medium",
"warn",
"Daemon auth token is empty; this is unexpected because token bootstrap is automatic and protected endpoints fail closed."
.to_string(),
Some("Restart the app/daemon to re-run token bootstrap and verify keyring/secret-store availability."),
false,
));
} else {
findings.push(security_finding(
"daemon_auth_token",
"low",
"pass",
"Daemon auth token is configured.".to_string(),
None,
false,
));
}
match Config::from_store(&state.db_path) {
Ok(config) => {
findings.push(security_finding(
"config_load",
"low",
"pass",
"Config loaded from store/keyring.".to_string(),
None,
false,
));
let root = json!({ "tools": config.tools.clone().unwrap_or(Value::Null) });
let sandbox = SandboxSettings::from_root_config(&root);
let built_in_tools = [
"coding",
"mcp",
"http_call",
"github",
"zapier",
"planning",
"reminders",
"search_internet",
"tasks",
"todo",
"wakeup",
];
let mut non_wasm_tools = Vec::new();
for tool_name in built_in_tools {
let plan = sandbox.execution_plan(tool_name);
if plan.runtime != ToolRuntime::Wasm {
non_wasm_tools.push(tool_name);
}
}
if non_wasm_tools.is_empty() {
findings.push(security_finding(
"tool_runtime_invariant",
"low",
"pass",
"All built-in tools resolve to WASM runtime.".to_string(),
None,
false,
));
} else {
findings.push(security_finding(
"tool_runtime_invariant",
"high",
"fail",
format!(
"Non-WASM tool runtime detected for: {}.",
non_wasm_tools.join(", ")
),
Some(
"Enforce WASM-only execution in sandbox settings and tool runtime planner.",
),
false,
));
}
let default_deny = config
.tools
.as_ref()
.and_then(|tools| tools.get("settings"))
.and_then(|settings| settings.get("permissions"))
.and_then(|permissions| permissions.get("default_deny"))
.and_then(|value| value.as_bool())
.unwrap_or(false);
if default_deny {
findings.push(security_finding(
"network_default_deny",
"low",
"pass",
"Global tools network policy uses default_deny=true.".to_string(),
None,
false,
));
} else {
findings.push(security_finding(
"network_default_deny",
"medium",
"warn",
"Global tools network policy default_deny is disabled or missing."
.to_string(),
Some("Set tools.settings.permissions.default_deny to true and allowlist required domains."),
false,
));
}
}
Err(err) => {
findings.push(security_finding(
"config_load",
"critical",
"fail",
format!("Config load failed: {err}"),
Some("Save a valid config in Config tab and rerun security audit."),
false,
));
}
}
findings
}
async fn run_doctor_checks(state: &AppState) -> Vec<DoctorCheck> {
let mut checks = Vec::new();
if state.token.trim().is_empty() {
checks.push(doctor_check(
"daemon_auth_token",
"warn",
"Daemon auth token is empty; this is unexpected because token bootstrap is automatic and protected endpoints fail closed."
.to_string(),
Some("Restart the app/daemon to re-run token bootstrap and verify keyring/secret-store availability."),
));
} else {
checks.push(doctor_check(
"daemon_auth_token",
"pass",
"Daemon auth token is configured.".to_string(),
None,
));
}
match Config::from_store(&state.db_path) {
Ok(config) => {
checks.push(doctor_check(
"config_store",
"pass",
"Config loaded from store/keyring.".to_string(),
None,
));
match config.clone().resolve_vault() {
Ok(_) => {
checks.push(doctor_check(
"vault_resolution",
"pass",
"Vault-backed secrets resolved successfully.".to_string(),
None,
));
}
Err(err) => {
checks.push(doctor_check(
"vault_resolution",
"fail",
format!("Vault resolution failed: {err}"),
Some("Verify OS keychain access and required secret keys."),
));
}
}
let mode = crate::security::tpm_provider::tpm_mode();
let tpm_available = crate::security::tpm_provider::tpm_available();
if tpm_available {
checks.push(doctor_check(
"security_tpm_mode",
"pass",
format!("TPM capability available (mode={mode})."),
None,
));
checks.push(doctor_check(
"solana_custody",
"pass",
"Secure Solana wallet custody/signing backend is available.".to_string(),
None,
));
} else if mode == "strict" {
checks.push(doctor_check(
"security_tpm_mode",
"fail",
"TPM unavailable while strict mode is enabled.".to_string(),
Some("Switch TPM mode to auto/compatible or restore TPM availability."),
));
checks.push(doctor_check(
"solana_custody",
"warn",
"Solana signing/custody is disabled because secure key backend is unavailable in strict mode."
.to_string(),
Some("Use a machine with TPM support or relax TPM mode for compatibility."),
));
} else {
checks.push(doctor_check(
"security_tpm_mode",
"warn",
format!(
"TPM unavailable; running in compatibility path (mode={mode}) with degraded hardware binding."
),
Some("Use strict mode on TPM-capable hardware for strongest key protection."),
));
checks.push(doctor_check(
"solana_custody",
"warn",
"Solana signing/custody is degraded or disabled without hardware-backed key protection."
.to_string(),
Some("For production custody, run on TPM-capable hardware in strict mode."),
));
}
}
Err(err) => {
checks.push(doctor_check(
"config_store",
"fail",
format!("Config load failed: {err}"),
Some("Save a valid config in the Config tab and retry."),
));
checks.push(doctor_check(
"vault_resolution",
"warn",
"Skipped because config could not be loaded.".to_string(),
Some("Fix config_store check first."),
));
checks.push(doctor_check(
"security_tpm_mode",
"warn",
"Skipped because config could not be loaded.".to_string(),
Some("Fix config_store check first."),
));
checks.push(doctor_check(
"solana_custody",
"warn",
"Skipped because config could not be loaded.".to_string(),
Some("Fix config_store check first."),
));
}
}
let db_path = state.db_path.clone();
let db_check = tokio::task::spawn_blocking(move || -> DoctorCheck {
if let Err(err) = crate::config_store::ensure_parent_dir(&db_path) {
return doctor_check(
"database_access",
"fail",
format!("Database directory check failed: {err}"),
Some("Verify filesystem permissions for DB path."),
);
}
let mut conn = match SqliteConnection::establish(&db_path) {
Ok(conn) => conn,
Err(err) => {
return doctor_check(
"database_access",
"fail",
format!("Database open failed: {err}"),
Some("Verify DB path and SQLite/SQLCipher availability."),
)
}
};
if let Err(err) = crate::db::apply_sqlcipher_key_sync(&mut conn) {
return doctor_check(
"database_access",
"fail",
format!("Database key apply failed: {err}"),
Some("Verify secure secret storage availability for db_encryption_key."),
);
}
let probe_result = diesel::sql_query(
"CREATE TABLE IF NOT EXISTS doctor_probe (id INTEGER PRIMARY KEY, ts INTEGER NOT NULL)",
)
.execute(&mut conn);
match probe_result {
Ok(_) => doctor_check(
"database_access",
"pass",
"Database opened and write probe succeeded.".to_string(),
None,
),
Err(err) => doctor_check(
"database_access",
"fail",
format!("Database write probe failed: {err}"),
Some("Verify DB permissions and SQLCipher key configuration."),
),
}
})
.await;
match db_check {
Ok(check) => checks.push(check),
Err(err) => checks.push(doctor_check(
"database_access",
"fail",
format!("Database check task failed: {err}"),
Some("Retry diagnostics; if persistent, inspect runtime logs."),
)),
}
checks
}
async fn process_text(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<ProcessTextRequest>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
if asks_for_wallet_address_only(&payload.text) {
match crate::security::solana_signer::wallet_address(&payload.user_id, "agent") {
Ok(address) => {
return (
StatusCode::OK,
Json(ProcessTextResponse {
text: format!("Your Solana wallet address is {address}."),
}),
)
.into_response();
}
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
}
}
if asks_for_wallet_balance_only(&payload.text) {
let balance_line = match solana_balance_line_for_user(&state, &payload.user_id).await {
Ok(line) => line,
Err(err) => format!("I couldn't fetch your Solana balance right now: {err}"),
};
return (
StatusCode::OK,
Json(ProcessTextResponse { text: balance_line }),
)
.into_response();
}
let options = ProcessOptions {
prompt: payload.prompt.clone(),
images: Vec::new(),
output_format: OutputFormat::Text,
image_detail: "auto".to_string(),
json_schema: None,
};
let agent = state.agent.read().await.clone();
let response = agent
.process(&payload.user_id, UserInput::Text(payload.text), options)
.await;
match response {
Ok(ProcessResult::Text(text)) => {
(StatusCode::OK, Json(ProcessTextResponse { text })).into_response()
}
Ok(other) => (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Unexpected response: {other:?}"),
}),
)
.into_response(),
Err(err) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn process_text_stream(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<ProcessTextRequest>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let agent = state.agent.read().await.clone();
let ProcessTextRequest {
user_id,
text,
prompt,
} = payload;
if asks_for_wallet_address_only(&text) {
let wallet_line = match crate::security::solana_signer::wallet_address(&user_id, "agent") {
Ok(address) => format!("Your Solana wallet address is {address}."),
Err(err) => format!("[error] {}", err),
};
let body = Body::from(wallet_line);
return Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/plain; charset=utf-8")
.body(body)
.unwrap();
}
if asks_for_wallet_balance_only(&text) {
let balance_line = match solana_balance_line_for_user(&state, &user_id).await {
Ok(line) => line,
Err(err) => format!("I couldn't fetch your Solana balance right now: {err}"),
};
let body = Body::from(balance_line);
return Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/plain; charset=utf-8")
.body(body)
.unwrap();
}
let body = Body::from_stream(async_stream::stream! {
let mut stream = agent.process_text_stream(&user_id, &text, prompt.as_deref());
while let Some(item) = stream.next().await {
match item {
Ok(chunk) => {
if !chunk.is_empty() {
yield Ok::<Bytes, std::convert::Infallible>(Bytes::from(chunk));
}
}
Err(err) => {
let message = format!("\n[error] {}", err);
yield Ok(Bytes::from(message));
break;
}
}
}
});
Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/plain; charset=utf-8")
.body(body)
.unwrap()
}
async fn preload_boot(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<PreloadBootRequest>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let agent = state.agent.read().await.clone();
let db_path = state.db_path.clone();
let ui_event_tx = state.ui_event_tx.clone();
let user_id = payload.user_id.clone();
tokio::spawn(async move {
let quick_timeout = Duration::from_secs(2);
let context_status =
match tokio::time::timeout(quick_timeout, agent.preload_context(&user_id)).await {
Ok(Ok(())) => "ok".to_string(),
Ok(Err(err)) => format!("error: {err}"),
Err(_) => {
let agent = agent.clone();
let ui_event_tx = ui_event_tx.clone();
let user_id = user_id.clone();
tokio::spawn(async move {
let status = match agent.preload_context(&user_id).await {
Ok(()) => "ok".to_string(),
Err(err) => format!("error: {err}"),
};
let _ = ui_event_tx.send(UiEvent {
event_type: "boot".to_string(),
user_id: user_id.clone(),
tool: "context".to_string(),
status: status.clone(),
payload: json!({"user_id": user_id, "status": status, "phase": "deferred"}),
timestamp: now_ts(),
});
});
"deferred".to_string()
}
};
let _ = ui_event_tx.send(UiEvent {
event_type: "boot".to_string(),
user_id: user_id.clone(),
tool: "context".to_string(),
status: context_status.clone(),
payload: json!({"user_id": user_id, "status": context_status, "phase": "quick"}),
timestamp: now_ts(),
});
let heartbeat_status = if let Ok(config) = Config::from_store(&db_path) {
let source = config.heartbeat_source;
match tokio::time::timeout(quick_timeout, load_markdown_content(&source)).await {
Ok(Ok(markdown)) => {
agent.set_heartbeat_markdown(markdown.clone()).await;
if markdown
.as_ref()
.map(|m| !m.trim().is_empty())
.unwrap_or(false)
{
"ok".to_string()
} else {
"empty".to_string()
}
}
Ok(Err(err)) => format!("error: {err}"),
Err(_) => {
let agent = agent.clone();
let ui_event_tx = ui_event_tx.clone();
let source = source.clone();
tokio::spawn(async move {
let status = match load_markdown_content(&source).await {
Ok(markdown) => {
agent.set_heartbeat_markdown(markdown.clone()).await;
if markdown
.as_ref()
.map(|m| !m.trim().is_empty())
.unwrap_or(false)
{
"ok".to_string()
} else {
"empty".to_string()
}
}
Err(err) => format!("error: {err}"),
};
let _ = ui_event_tx.send(UiEvent {
event_type: "boot".to_string(),
user_id: "system".to_string(),
tool: "heartbeat".to_string(),
status: status.clone(),
payload: json!({"status": status, "phase": "deferred"}),
timestamp: now_ts(),
});
});
"deferred".to_string()
}
}
} else {
"config_error".to_string()
};
let _ = ui_event_tx.send(UiEvent {
event_type: "boot".to_string(),
user_id: "system".to_string(),
tool: "heartbeat".to_string(),
status: heartbeat_status.clone(),
payload: json!({"status": heartbeat_status, "phase": "quick"}),
timestamp: now_ts(),
});
let prompt_status = if let Ok(config) = Config::from_store(&db_path) {
let source = config.prompt_source;
match tokio::time::timeout(quick_timeout, load_markdown_content(&source)).await {
Ok(Ok(markdown)) => {
agent.set_prompt_markdown(markdown.clone()).await;
if markdown
.as_ref()
.map(|m| !m.trim().is_empty())
.unwrap_or(false)
{
"ok".to_string()
} else {
"empty".to_string()
}
}
Ok(Err(err)) => format!("error: {err}"),
Err(_) => {
let agent = agent.clone();
let ui_event_tx = ui_event_tx.clone();
let source = source.clone();
tokio::spawn(async move {
let status = match load_markdown_content(&source).await {
Ok(markdown) => {
agent.set_prompt_markdown(markdown.clone()).await;
if markdown
.as_ref()
.map(|m| !m.trim().is_empty())
.unwrap_or(false)
{
"ok".to_string()
} else {
"empty".to_string()
}
}
Err(err) => format!("error: {err}"),
};
let _ = ui_event_tx.send(UiEvent {
event_type: "boot".to_string(),
user_id: "system".to_string(),
tool: "prompt".to_string(),
status: status.clone(),
payload: json!({"status": status, "phase": "deferred"}),
timestamp: now_ts(),
});
});
"deferred".to_string()
}
}
} else {
"config_error".to_string()
};
let _ = ui_event_tx.send(UiEvent {
event_type: "boot".to_string(),
user_id: user_id.clone(),
tool: "prompt".to_string(),
status: prompt_status.clone(),
payload: json!({"status": prompt_status}),
timestamp: now_ts(),
});
if (heartbeat_status == "ok"
|| heartbeat_status == "empty"
|| heartbeat_status == "deferred")
&& (prompt_status == "ok" || prompt_status == "empty" || prompt_status == "deferred")
{
let agent = agent.clone();
let ui_event_tx = ui_event_tx.clone();
let user_id = user_id.clone();
tokio::spawn(async move {
run_autonomy_tick(agent, ui_event_tx, user_id, "boot").await;
});
}
});
(
StatusCode::OK,
Json(PreloadBootResponse {
context_status: "started".to_string(),
heartbeat_status: "started".to_string(),
}),
)
.into_response()
}
async fn memory_search(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<MemorySearchRequest>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let limit = payload.limit.unwrap_or(8);
let agent = state.agent.read().await.clone();
let response = agent
.search_memory(&payload.user_id, &payload.query, limit)
.await;
match response {
Ok(results) => (StatusCode::OK, Json(MemorySearchResponse { results })).into_response(),
Err(err) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn chat_history(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Query(query): axum::extract::Query<ChatHistoryQuery>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let limit = query.limit.unwrap_or(40).clamp(1, 200);
let agent = state.agent.read().await.clone();
let response = agent.get_user_history(&query.user_id, limit).await;
match response {
Ok(history) => (StatusCode::OK, Json(ChatHistoryResponse { history })).into_response(),
Err(err) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn clear_user_history(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<ClearHistoryRequest>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let agent = state.agent.read().await.clone();
tracing::info!(
"clear_user_history requested for user_id={}",
payload.user_id
);
match agent.delete_user_history(&payload.user_id).await {
Ok(()) => (
StatusCode::OK,
Json(ClearHistoryResponse {
status: "ok".to_string(),
message: "User history cleared".to_string(),
}),
)
.into_response(),
Err(err) => {
tracing::error!(
"clear_user_history failed for user_id={}: {}",
payload.user_id,
err
);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response()
}
}
}
async fn clear_user_data(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<ClearUserDataRequest>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let user_id = payload.user_id;
let agent = state.agent.read().await.clone();
if let Err(err) = agent.delete_user_history(&user_id).await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
let reminders_deleted = match ReminderStore::new(&state.db_path).await {
Ok(store) => match store.delete_all(&user_id, true).await {
Ok(v) => v,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
},
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
};
let todos_deleted = match TodoStore::new(&state.db_path).await {
Ok(store) => match store
.clear_items(&user_id, crate::todo::TodoStatus::All)
.await
{
Ok(v) => v,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
},
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
};
let tasks_deleted = match TaskStore::new(&state.db_path).await {
Ok(store) => match store
.clear_tasks(&user_id, crate::tasks::TaskStatus::All)
.await
{
Ok(v) => v,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
},
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
};
let plans_deleted = match PlanStore::new(&state.db_path).await {
Ok(store) => match store.clear_plans(&user_id).await {
Ok(v) => v,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
},
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
};
let inbox_states_deleted = match InboxStateStore::new(&state.db_path).await {
Ok(store) => match store.clear_statuses(&user_id).await {
Ok(v) => v,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
},
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
};
(
StatusCode::OK,
Json(ClearUserDataResponse {
status: "ok".to_string(),
message: "User work data cleared".to_string(),
cleared: json!({
"history": true,
"reminders": reminders_deleted,
"todos": todos_deleted,
"tasks": tasks_deleted,
"plans": plans_deleted,
"inbox_state_overrides": inbox_states_deleted,
}),
}),
)
.into_response()
}
async fn reminder_stream(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Query(query): axum::extract::Query<ReminderStreamQuery>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let store = state.reminder_store.clone();
let user_id = query.user_id;
let mut tick = tokio::time::interval(Duration::from_secs(1));
let body = Body::from_stream(async_stream::stream! {
loop {
tick.tick().await;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
if let Ok(items) = store.due_reminders(&user_id, now, 10).await {
if cfg!(debug_assertions) && !items.is_empty() {
eprintln!(
"Reminder stream emit: user_id={} count={} now={}",
user_id,
items.len(),
now
);
}
for item in items {
let payload = serde_json::json!({
"id": item.id,
"title": item.title,
"due_at": item.due_at,
});
let line = format!("data: {}\n\n", payload);
yield Ok::<Bytes, std::convert::Infallible>(Bytes::from(line));
}
}
}
});
Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/event-stream")
.header("cache-control", "no-cache")
.body(body)
.unwrap()
}
async fn reload_config(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let agent =
ButterflyBot::from_store_with_events(&state.db_path, Some(state.ui_event_tx.clone())).await;
match agent {
Ok(agent) => {
let mut guard = state.agent.write().await;
*guard = Arc::new(agent);
(
StatusCode::OK,
Json(json!({"status": "ok", "message": "Config reloaded"})),
)
.into_response()
}
Err(err) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response(),
}
}
async fn factory_reset_config(
State(state): State<AppState>,
headers: HeaderMap,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let config = Config::convention_defaults(&state.db_path);
if let Err(err) = config_store::save_config(&state.db_path, &config) {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
let config_value = match serde_json::to_value(&config) {
Ok(value) => value,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: err.to_string(),
}),
)
.into_response();
}
};
let pretty = serde_json::to_string_pretty(&config_value).unwrap_or_default();
if let Err(err) = vault::set_secret("app_config_json", &pretty) {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("Failed to persist vault config during factory reset: {err}"),
}),
)
.into_response();
}
let mut message = "Config reset to factory defaults".to_string();
match ButterflyBot::from_store_with_events(&state.db_path, Some(state.ui_event_tx.clone()))
.await
{
Ok(agent) => {
let mut guard = state.agent.write().await;
*guard = Arc::new(agent);
}
Err(err) => {
tracing::warn!("factory_reset_config: agent reload failed: {}", err);
message.push_str("; reload failed, restart daemon to apply runtime state");
}
}
(
StatusCode::OK,
Json(FactoryResetConfigResponse {
status: "ok".to_string(),
message,
config: config_value,
}),
)
.into_response()
}
async fn ui_events(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Query(query): axum::extract::Query<UiEventStreamQuery>,
) -> impl IntoResponse {
if let Err(err) = authorize(&headers, &state.token) {
return err.into_response();
}
let mut receiver = state.ui_event_tx.subscribe();
let filter_user = query.user_id;
let body = Body::from_stream(async_stream::stream! {
loop {
match receiver.recv().await {
Ok(event) => {
if let Some(filter) = &filter_user {
if event.user_id != *filter
&& event.user_id != "system"
&& event.event_type != "boot"
&& event.event_type != "autonomy"
{
continue;
}
}
let payload = serde_json::to_string(&event).unwrap_or_default();
let line = format!("data: {}\n\n", payload);
yield Ok::<Bytes, std::convert::Infallible>(Bytes::from(line));
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
continue;
}
Err(_) => break,
}
}
});
Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/event-stream")
.header("cache-control", "no-cache")
.body(body)
.unwrap()
}
fn authorize(
headers: &HeaderMap,
token: &str,
) -> std::result::Result<(), (StatusCode, Json<ErrorResponse>)> {
let expected_token = token.trim();
if expected_token.is_empty() {
return Err((
StatusCode::UNAUTHORIZED,
Json(ErrorResponse {
error: "Unauthorized".to_string(),
}),
));
}
let header = headers
.get("authorization")
.and_then(|value| value.to_str().ok())
.unwrap_or_default();
let api_key = headers
.get("x-api-key")
.and_then(|value| value.to_str().ok())
.unwrap_or_default();
let bearer = header.strip_prefix("Bearer ").unwrap_or("").trim();
let api_key = api_key.trim();
if bearer == expected_token || api_key == expected_token {
Ok(())
} else {
Err((
StatusCode::UNAUTHORIZED,
Json(ErrorResponse {
error: "Unauthorized".to_string(),
}),
))
}
}
fn load_solana_rpc_policy(state: &AppState) -> Result<SolanaRpcExecutionPolicy> {
let config = Config::from_store(&state.db_path)
.unwrap_or_else(|_| Config::convention_defaults(&state.db_path));
SolanaRpcExecutionPolicy::from_config(&config)
}
fn require_solana_rpc_endpoint(policy: &SolanaRpcExecutionPolicy) -> Result<String> {
let endpoint = policy.endpoint.as_deref().unwrap_or("").trim();
if endpoint.is_empty() {
return Err(ButterflyBotError::Config(
"tools.settings.solana.rpc.endpoint must be configured for Solana RPC".to_string(),
));
}
Ok(endpoint.to_string())
}
fn resolve_query_or_wallet_address(
address: Option<String>,
user_id: Option<String>,
actor: Option<String>,
default_actor: &str,
) -> Result<String> {
if let Some(address) = address {
let trimmed = address.trim();
if !trimmed.is_empty() {
return Ok(trimmed.to_string());
}
}
let user_id = user_id.ok_or_else(|| {
ButterflyBotError::Config("user_id is required when address is not provided".to_string())
})?;
let actor = actor.unwrap_or_else(|| default_actor.to_string());
crate::security::solana_signer::wallet_address(&user_id, &actor)
}
fn bootstrap_wallet_targets(config: Option<&Config>) -> Vec<(String, String)> {
let configured = config
.and_then(|cfg| cfg.tools.as_ref())
.and_then(|tools| tools.get("settings"))
.and_then(|settings| settings.get("solana"))
.and_then(|solana| {
solana.get("bootstrap_wallets").or_else(|| {
solana
.get("rpc")
.and_then(|rpc| rpc.get("bootstrap_wallets"))
})
})
.and_then(|targets| targets.as_array())
.map(|targets| {
targets
.iter()
.filter_map(|entry| {
let user_id = entry
.get("user_id")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|v| !v.is_empty())?;
let actor = entry
.get("actor")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|v| !v.is_empty())
.unwrap_or("agent");
Some((user_id.to_string(), actor.to_string()))
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
if configured.is_empty() {
vec![("user".to_string(), "agent".to_string())]
} else {
configured
}
}
fn bootstrap_solana_wallets(config: Option<&Config>) -> Result<()> {
for (user_id, actor) in bootstrap_wallet_targets(config) {
let address = crate::security::solana_signer::wallet_address(&user_id, &actor)?;
tracing::info!(user_id = %user_id, actor = %actor, address = %address, "Solana wallet bootstrapped");
}
Ok(())
}
fn is_unreadable_database_error(err: &ButterflyBotError) -> bool {
let lowered = err.to_string().to_ascii_lowercase();
lowered.contains("file is not a database")
|| lowered.contains("file is encrypted or is not a database")
|| lowered.contains("database disk image is malformed")
}
fn archive_db_for_recovery(db_path: &str) -> Result<Option<String>> {
let path = std::path::Path::new(db_path);
if !path.exists() {
return Ok(None);
}
let file_name = match path.file_name().and_then(|name| name.to_str()) {
Some(name) if !name.trim().is_empty() => name,
_ => return Ok(None),
};
let stamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|dur| dur.as_secs())
.unwrap_or(0);
let backup_name = format!("{file_name}.recovery-{stamp}.bak");
let backup_path = match path.parent() {
Some(parent) => parent.join(backup_name),
None => return Ok(None),
};
std::fs::rename(path, &backup_path).map_err(|e| {
ButterflyBotError::Runtime(format!(
"failed to archive unreadable database {} -> {}: {e}",
path.to_string_lossy(),
backup_path.to_string_lossy()
))
})?;
Ok(Some(backup_path.to_string_lossy().to_string()))
}
fn load_or_bootstrap_config_with_recovery(db_path: &str) -> Result<Config> {
if let Ok(config) = Config::from_store(db_path) {
return Ok(config);
}
tracing::warn!("No config in store; writing default config for {}", db_path);
let default_config = Config::convention_defaults(db_path);
match config_store::save_config(db_path, &default_config) {
Ok(()) => Ok(default_config),
Err(err) if is_unreadable_database_error(&err) => {
let backup = archive_db_for_recovery(db_path)?;
tracing::warn!(
db_path = %db_path,
backup_path = backup.as_deref().unwrap_or("(none)"),
error = %err,
"Recovered unreadable database by archiving and recreating config store"
);
config_store::save_config(db_path, &default_config)?;
Ok(default_config)
}
Err(err) => Err(err),
}
}
pub async fn run(host: &str, port: u16, db_path: &str, token: &str) -> Result<()> {
run_with_shutdown(host, port, db_path, token, futures::future::pending::<()>()).await
}
pub async fn run_with_shutdown<F>(
host: &str,
port: u16,
db_path: &str,
token: &str,
shutdown: F,
) -> Result<()>
where
F: Future<Output = ()> + Send + 'static,
{
crate::security::hardening::run_startup_self_check()?;
let wasm_dir = crate::wasm_bundle::ensure_bundled_wasm_tools()?;
tracing::info!(
wasm_dir = %wasm_dir.to_string_lossy(),
"Ensured bundled WASM tool modules"
);
let config = load_or_bootstrap_config_with_recovery(db_path)?;
tracing::info!(
"Daemon config: prompt_source={:?}, heartbeat_source={:?}",
config.prompt_source,
config.heartbeat_source
);
bootstrap_solana_wallets(Some(&config))?;
let tick_seconds = Some(&config)
.and_then(|cfg| cfg.brains.as_ref())
.and_then(|brains| brains.get("settings"))
.and_then(|settings| settings.get("tick_seconds"))
.and_then(|value| value.as_u64())
.unwrap_or(60);
let (ui_event_tx, _) = broadcast::channel(256);
if let Some(path) = ui_event_log_path(Some(&config)) {
let mut rx = ui_event_tx.subscribe();
let path = path.clone();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
let _ = write_ui_event_log(&path, &event);
}
Err(broadcast::error::RecvError::Lagged(_)) => {
continue;
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
}
let agent = Arc::new(RwLock::new(Arc::new(
ButterflyBot::from_store_with_events(db_path, Some(ui_event_tx.clone())).await?,
)));
let reminder_db_path = Some(&config)
.and_then(|cfg| serde_json::to_value(cfg).ok())
.and_then(|value| resolve_reminder_db_path(&value))
.unwrap_or_else(|| db_path.to_string());
let reminder_store = Arc::new(ReminderStore::new(reminder_db_path).await?);
let task_store = Arc::new(TaskStore::new(db_path).await?);
let wakeup_store = Arc::new(WakeupStore::new(db_path).await?);
let mut scheduler = Scheduler::new();
scheduler.register_job(Arc::new(BrainTickJob {
agent: agent.clone(),
interval: Duration::from_secs(tick_seconds.max(1)),
}));
let wakeup_poll_seconds = Some(&config)
.and_then(|cfg| cfg.tools.as_ref())
.and_then(|tools| tools.get("wakeup"))
.and_then(|wakeup| wakeup.get("poll_seconds"))
.and_then(|value| value.as_u64())
.unwrap_or(60);
let autonomy_cooldown_seconds = Some(&config)
.and_then(|cfg| cfg.tools.as_ref())
.and_then(|tools| {
tools
.get("settings")
.and_then(|settings| settings.get("autonomy_cooldown_seconds"))
.and_then(|value| value.as_u64())
.or_else(|| {
tools
.get("wakeup")
.and_then(|wakeup| wakeup.get("autonomy_cooldown_seconds"))
.and_then(|value| value.as_u64())
})
})
.unwrap_or(60);
set_autonomy_cooldown_seconds(autonomy_cooldown_seconds);
scheduler.register_job(Arc::new(WakeupJob {
agent: agent.clone(),
store: wakeup_store.clone(),
interval: Duration::from_secs(wakeup_poll_seconds.max(1)),
ui_event_tx: ui_event_tx.clone(),
audit_log_path: wakeup_audit_log_path(Some(&config)),
heartbeat_source: Some(&config)
.map(|cfg| cfg.heartbeat_source.clone())
.unwrap_or_else(crate::config::MarkdownSource::default_heartbeat),
db_path: db_path.to_string(),
}));
let tasks_poll_seconds = Some(&config)
.and_then(|cfg| cfg.tools.as_ref())
.and_then(|tools| tools.get("tasks"))
.and_then(|tasks| tasks.get("poll_seconds"))
.and_then(|value| value.as_u64())
.unwrap_or(60);
scheduler.register_job(Arc::new(ScheduledTasksJob {
agent: agent.clone(),
store: task_store.clone(),
interval: Duration::from_secs(tasks_poll_seconds.max(1)),
ui_event_tx: ui_event_tx.clone(),
audit_log_path: tasks_audit_log_path(Some(&config)),
}));
let reminders_poll_seconds = Some(&config)
.and_then(|cfg| cfg.tools.as_ref())
.and_then(|tools| tools.get("reminders"))
.and_then(|reminders| reminders.get("poll_seconds"))
.and_then(|value| value.as_u64())
.unwrap_or(10);
scheduler.register_job(Arc::new(ReminderDispatchJob {
store: reminder_store.clone(),
interval: Duration::from_secs(reminders_poll_seconds.max(1)),
ui_event_tx: ui_event_tx.clone(),
audit_log_path: reminders_audit_log_path(Some(&config)),
}));
scheduler.start();
let state = AppState {
agent,
reminder_store,
signer_service: SignerService::default(),
token: token.to_string(),
ui_event_tx,
db_path: db_path.to_string(),
};
let app = build_router(state);
let addr = format!("{host}:{port}");
let listener = tokio::net::TcpListener::bind(&addr)
.await
.map_err(|e| ButterflyBotError::Runtime(e.to_string()))?;
tracing::info!(address = %addr, "Daemon listener bound");
let shutdown = async move {
shutdown.await;
scheduler.stop().await;
};
axum::serve(listener, app)
.with_graceful_shutdown(shutdown)
.await
.map_err(|e| ButterflyBotError::Runtime(e.to_string()))?;
Ok(())
}
async fn run_autonomy_tick(
agent: Arc<crate::client::ButterflyBot>,
ui_event_tx: broadcast::Sender<UiEvent>,
user_id: String,
source: &str,
) {
let run_at = now_ts();
if let Some(remaining) = try_begin_autonomy_tick(run_at) {
let _ = ui_event_tx.send(UiEvent {
event_type: "autonomy".to_string(),
user_id,
tool: "heartbeat".to_string(),
status: "skipped".to_string(),
payload: json!({
"source": source,
"reason": "cooldown",
"cooldown_remaining_seconds": remaining,
}),
timestamp: run_at,
});
return;
}
let _ = ui_event_tx.send(UiEvent {
event_type: "autonomy".to_string(),
user_id: user_id.clone(),
tool: "heartbeat".to_string(),
status: "started".to_string(),
payload: json!({"source": source}),
timestamp: run_at,
});
let options = ProcessOptions {
prompt: Some(
"AUTONOMY MODE: Heartbeat tick.\n\
Run autonomous checks/actions as needed using tools.\n\
Output requirements:\n\
- Return ONLY one short final status line (max 120 chars).\n\
- Do NOT include Thought, Plan, Action, Observation, Summary, or Reasoning sections.\n\
- Do NOT dump tool call details.\n\
- Good outputs: 'No-op', 'Processed 2 due tasks', 'Updated plans/todos; no urgent actions'."
.to_string(),
),
images: Vec::new(),
output_format: OutputFormat::Text,
image_detail: "auto".to_string(),
json_schema: None,
};
let result = tokio::time::timeout(Duration::from_secs(120), async {
agent
.process(
&user_id,
UserInput::Text("Autonomous heartbeat tick".to_string()),
options,
)
.await
})
.await;
let (status, payload): (String, serde_json::Value) = match result {
Ok(Ok(ProcessResult::Text(text))) => {
let trimmed = text.trim();
let status = if trimmed.is_empty()
|| trimmed.eq_ignore_ascii_case("no-op")
|| trimmed.eq_ignore_ascii_case("noop")
{
"no-op"
} else {
"ok"
};
(
status.to_string(),
json!({"output": text, "source": source}),
)
}
Ok(Ok(_)) => (
"error".to_string(),
json!({"error": "Unexpected non-text response", "source": source}),
),
Ok(Err(err)) => (
"error".to_string(),
json!({"error": err.to_string(), "source": source}),
),
Err(_) => (
"error".to_string(),
json!({"error": "autonomy timeout", "source": source}),
),
};
let _ = ui_event_tx.send(UiEvent {
event_type: "autonomy".to_string(),
user_id,
tool: "heartbeat".to_string(),
status,
payload,
timestamp: now_ts(),
});
}
fn now_ts() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
}
fn resolve_notification_icon_path() -> Option<String> {
let mut candidates = Vec::new();
if let Ok(cwd) = std::env::current_dir() {
candidates.push(
cwd.join("assets/icons/hicolor/256x256/apps/butterfly-bot.png")
.to_string_lossy()
.to_string(),
);
candidates.push(
cwd.join("assets/icons/hicolor/128x128/apps/butterfly-bot.png")
.to_string_lossy()
.to_string(),
);
}
if let Ok(exe) = std::env::current_exe() {
if let Some(bin_dir) = exe.parent() {
candidates.push(
bin_dir
.join("../Resources/butterfly-bot.png")
.to_string_lossy()
.to_string(),
);
candidates.push(
bin_dir
.join("../share/icons/hicolor/256x256/apps/butterfly-bot.png")
.to_string_lossy()
.to_string(),
);
}
}
candidates.push("/usr/share/icons/hicolor/256x256/apps/butterfly-bot.png".to_string());
candidates.into_iter().find_map(|path| {
let icon_path = std::path::Path::new(&path);
if !icon_path.exists() {
return None;
}
Some(
std::fs::canonicalize(icon_path)
.unwrap_or_else(|_| icon_path.to_path_buf())
.to_string_lossy()
.to_string(),
)
})
}
fn send_desktop_notification(summary: &str, body: &str) -> bool {
#[cfg(target_os = "linux")]
{
let icon_path = resolve_notification_icon_path();
let mut notification = notify_rust::Notification::new();
notification.summary(summary).body(body);
if let Some(icon) = &icon_path {
notification.icon(icon);
}
notification.show().map(|_| true).unwrap_or_else(|err| {
tracing::warn!(error = %err, "Desktop notification failed");
false
})
}
#[cfg(target_os = "macos")]
{
let icon_path = resolve_notification_icon_path();
let mut cmd = std::process::Command::new("terminal-notifier");
cmd.arg("-title").arg(summary).arg("-message").arg(body);
if let Some(icon) = &icon_path {
let icon_url = format!("file://{icon}");
cmd.arg("-contentImage").arg(icon_url);
}
match cmd.output() {
Ok(output) if output.status.success() => true,
Ok(output) => {
tracing::warn!(
code = output.status.code(),
stderr = %String::from_utf8_lossy(&output.stderr),
"Desktop notification failed via terminal-notifier; trying notify-rust fallback"
);
let mut notification = notify_rust::Notification::new();
notification.summary(summary).body(body);
if let Some(icon) = &icon_path {
notification.icon(icon);
}
notification.show().map(|_| true).unwrap_or_else(|err| {
tracing::warn!(error = %err, "Desktop notification fallback failed");
false
})
}
Err(err) => {
tracing::warn!(
error = %err,
"Desktop notification failed (terminal-notifier not available); trying notify-rust fallback"
);
let mut notification = notify_rust::Notification::new();
notification.summary(summary).body(body);
if let Some(icon) = &icon_path {
notification.icon(icon);
}
notification
.show()
.map(|_| true)
.unwrap_or_else(|fallback_err| {
tracing::warn!(error = %fallback_err, "Desktop notification fallback failed");
false
})
}
}
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
let _ = (summary, body);
return false;
}
}
fn wakeup_audit_log_path(config: Option<&Config>) -> Option<String> {
let path = config
.and_then(|cfg| cfg.tools.as_ref())
.and_then(|tools| tools.get("wakeup"))
.and_then(|wakeup| wakeup.get("audit_log_path"))
.and_then(|value| value.as_str())
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.or_else(|| Some("./data/wakeup_audit.log".to_string()));
path
}
fn write_wakeup_audit_log(
path: Option<&str>,
ts: i64,
task: &crate::wakeup::WakeupTask,
status: &str,
payload: serde_json::Value,
) -> Result<()> {
let Some(path) = path else {
return Ok(());
};
config_store::ensure_parent_dir(path)?;
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|e| ButterflyBotError::Runtime(e.to_string()))?;
let entry = serde_json::json!({
"timestamp": ts,
"task_id": task.id,
"user_id": task.user_id,
"name": task.name,
"prompt": task.prompt,
"status": status,
"payload": payload,
});
let line = serde_json::to_string(&entry)
.map_err(|e| ButterflyBotError::Serialization(e.to_string()))?;
use std::io::Write;
writeln!(file, "{line}").map_err(|e| ButterflyBotError::Runtime(e.to_string()))?;
Ok(())
}
fn ui_event_log_path(config: Option<&Config>) -> Option<String> {
config
.and_then(|cfg| cfg.tools.as_ref())
.and_then(|tools| tools.get("settings"))
.and_then(|settings| settings.get("ui_event_log_path"))
.and_then(|value| value.as_str())
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.or_else(|| Some("./data/ui_events.log".to_string()))
}
fn write_ui_event_log(path: &str, event: &UiEvent) -> Result<()> {
config_store::ensure_parent_dir(path)?;
let payload = serde_json::to_string(event)
.map_err(|e| ButterflyBotError::Serialization(e.to_string()))?;
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|e| ButterflyBotError::Runtime(e.to_string()))?;
writeln!(file, "{}", payload).map_err(|e| ButterflyBotError::Runtime(e.to_string()))?;
Ok(())
}
fn tasks_audit_log_path(config: Option<&Config>) -> Option<String> {
let path = config
.and_then(|cfg| cfg.tools.as_ref())
.and_then(|tools| tools.get("tasks"))
.and_then(|tasks| tasks.get("audit_log_path"))
.and_then(|value| value.as_str())
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.or_else(|| Some("./data/tasks_audit.log".to_string()));
path
}
fn reminders_audit_log_path(config: Option<&Config>) -> Option<String> {
config
.and_then(|cfg| cfg.tools.as_ref())
.and_then(|tools| tools.get("reminders"))
.and_then(|reminders| reminders.get("audit_log_path"))
.and_then(|value| value.as_str())
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.or_else(|| Some("./data/reminders_audit.log".to_string()))
}
fn write_reminder_audit_log(
path: Option<&str>,
ts: i64,
user_id: &str,
reminder_id: i32,
status: &str,
payload: serde_json::Value,
) -> Result<()> {
let Some(path) = path else {
return Ok(());
};
config_store::ensure_parent_dir(path)?;
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|e| ButterflyBotError::Runtime(e.to_string()))?;
let entry = serde_json::json!({
"timestamp": ts,
"user_id": user_id,
"reminder_id": reminder_id,
"status": status,
"payload": payload,
});
let line = serde_json::to_string(&entry)
.map_err(|e| ButterflyBotError::Serialization(e.to_string()))?;
use std::io::Write;
writeln!(file, "{line}").map_err(|e| ButterflyBotError::Runtime(e.to_string()))?;
Ok(())
}
fn write_tasks_audit_log(
path: Option<&str>,
ts: i64,
task: &crate::tasks::ScheduledTask,
status: &str,
payload: serde_json::Value,
) -> Result<()> {
let Some(path) = path else {
return Ok(());
};
config_store::ensure_parent_dir(path)?;
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|e| ButterflyBotError::Runtime(e.to_string()))?;
let entry = serde_json::json!({
"timestamp": ts,
"task_id": task.id,
"user_id": task.user_id,
"name": task.name,
"prompt": task.prompt,
"run_at": task.run_at,
"interval_minutes": task.interval_minutes,
"status": status,
"payload": payload,
});
let line = serde_json::to_string(&entry)
.map_err(|e| ButterflyBotError::Serialization(e.to_string()))?;
use std::io::Write;
writeln!(file, "{line}").map_err(|e| ButterflyBotError::Runtime(e.to_string()))?;
Ok(())
}