1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
mod agent;
mod call;
mod channel;
mod config;
mod context_compression;
mod daily_log;
mod heartbeat;
mod heartbeat_config;
mod mcp_client;
mod memory_compaction;
mod provider;
mod serve;
mod session;
mod tools;
mod workspace;
use agent::Agent;
use anyhow::{Context, Result};
use channel::discord::DiscordChannel;
use channel::matrix::MatrixChannel;
use clap::{Parser, Subcommand};
use config::Config;
use daily_log::catchup_pending_logs;
use heartbeat::Heartbeat;
use provider::anthropic::AnthropicProvider;
use sapphire_workspace::{AppContext, Workspace as SwWorkspace, WorkspaceState};
static APP_CTX: AppContext = AppContext::new("sapphire-agent");
use session::SessionStore;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tracing_subscriber::{EnvFilter, fmt};
use workspace::Workspace;
#[derive(Parser)]
#[command(
name = "sapphire-agent",
about = "Personal AI assistant — Anthropic + Matrix/Discord"
)]
struct Cli {
/// Path to config file (default: ~/.config/sapphire-agent/config.toml)
#[arg(short, long, value_name = "FILE")]
config: Option<PathBuf>,
#[command(subcommand)]
command: Option<Command>,
}
#[derive(Subcommand)]
enum Command {
/// Start the agent — Matrix/Discord channels + HTTP API server (default)
Serve {
/// Override bind address (e.g. 127.0.0.1:9000)
#[arg(long, value_name = "ADDR")]
bind: Option<String>,
},
/// Validate the config file and exit
Verify,
/// Interactive session with a running serve server
Call {
/// Server base URL
#[arg(long, default_value = "http://localhost:9000")]
server: String,
/// Grain-id of an existing session to resume (e.g. a3b7k9p)
#[arg(long)]
session: Option<String>,
/// List available API sessions and exit
#[arg(long)]
list: bool,
/// Send a single message and exit instead of entering the REPL.
/// Useful as a CJK-safe fallback or for IDE/editor integration.
#[arg(short, long, value_name = "TEXT")]
message: Option<String>,
/// Dump the session history and exit (no message sent, no REPL).
/// Intended for IDE integrations restoring a session.
#[arg(long)]
history: bool,
/// Emit machine-readable JSON output. Applies to --list, --history,
/// and --message; ignored in REPL mode.
#[arg(long)]
json: bool,
},
}
#[tokio::main]
async fn main() -> Result<()> {
fmt()
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
)
.init();
let cli = Cli::parse();
// `call` needs no config file — handle before loading config
if let Some(Command::Call {
server,
session,
list,
message,
history,
json,
}) = cli.command
{
return call::run(server, session, list, message, history, json).await;
}
let config_path = cli.config.unwrap_or_else(Config::default_path);
let config = Config::load(&config_path)
.with_context(|| format!("Failed to load config from {}", config_path.display()))?;
match cli.command.unwrap_or(Command::Serve { bind: None }) {
Command::Verify => {
let workspace_dir = config.resolved_workspace_dir(&config_path);
println!("Config OK");
if let Some(m) = &config.matrix {
println!(" Channel : matrix");
println!(" Matrix homeserver : {}", m.homeserver);
println!(" Matrix user_id : {}", m.user_id);
println!(" Matrix rooms : {:?}", m.room_ids);
} else if let Some(d) = &config.discord {
println!(" Channel : discord");
println!(" Discord channels : {:?}", d.channel_ids);
println!(" Allowed users : {:?}", d.allowed_users);
} else {
println!(" Channel : NONE (add [discord] or [matrix] to config)");
}
println!(" Anthropic model : {}", config.anthropic.model);
println!(" Anthropic max_tok : {}", config.anthropic.max_tokens);
println!(" Workspace dir : {}", workspace_dir.display());
println!(
" Day boundary hour : {}:00 local",
config.day_boundary_hour
);
println!(" Heartbeat enabled : {}", config.heartbeat_enabled);
println!(" Standby mode : {}", config.standby_mode);
println!();
let workspace_files = [
("AGENTS.md / AGENT.md", vec!["AGENTS.md", "AGENT.md"]),
("SOUL.md", vec!["SOUL.md"]),
("IDENTITY.md", vec!["IDENTITY.md"]),
("USER.md", vec!["USER.md"]),
("TOOLS.md", vec!["TOOLS.md"]),
("BOOTSTRAP.md", vec!["BOOTSTRAP.md"]),
("MEMORY.md", vec!["MEMORY.md", "memory.md"]),
];
for (label, candidates) in &workspace_files {
let found = candidates.iter().find_map(|f| {
let p = workspace_dir.join(f);
if p.exists() { Some(*f) } else { None }
});
match found {
Some(f) => println!(" {label:<28} found ({f})"),
None => println!(" {label:<28} -"),
}
}
}
Command::Serve { bind } => {
let workspace_dir = config.resolved_workspace_dir(&config_path);
// ── Bootstrap file loader (AGENTS.md, SOUL.md, MEMORY.md …) ────
let workspace = Arc::new(Workspace::new(workspace_dir.clone()));
// ── sapphire-workspace (search, file ops, git sync) ─────────────
let sw_workspace = SwWorkspace::resolve(&APP_CTX, Some(&workspace_dir))
.context("Failed to resolve sapphire-workspace")?;
// Use the [sync] section from the agent config directly.
// WorkspaceConfig was removed in sapphire-workspace 0.8.0;
// open_configured now takes &SyncConfig.
let sync_config = config.sync.clone().unwrap_or_default();
let ws_sync_interval = sync_config.sync_interval();
let ws_state = WorkspaceState::open_configured(sw_workspace, &sync_config)
.context("Failed to open WorkspaceState")?;
if let Err(e) = ws_state.periodic_sync() {
tracing::warn!("Initial workspace sync failed: {e}");
}
let ws_state = Arc::new(Mutex::new(ws_state));
// ── Periodic workspace sync (if enabled in workspace config) ────
if let Some(dur) = ws_sync_interval {
tracing::info!("Periodic workspace sync enabled: every {}s", dur.as_secs());
let ws = Arc::clone(&ws_state);
tokio::spawn(async move {
let mut tick = tokio::time::interval(dur);
tick.tick().await; // skip immediate fire
loop {
tick.tick().await;
let state = ws.lock().expect("ws_state mutex poisoned");
match state.periodic_sync() {
Ok((u, r)) => {
tracing::info!("Periodic ws sync: {u} upserted, {r} removed");
}
Err(e) => tracing::warn!("Periodic ws sync failed: {e:#}"),
}
}
});
}
// ── Tools ───────────────────────────────────────────────────────
let tool_set = tools::default_tool_set(
Arc::clone(&ws_state),
config.tools.tavily_api_key.clone(),
&config.tools.mcp_servers,
)
.await;
// ── Session store base directory ────────────────────────────────
let sessions_base = config.resolved_sessions_dir(&workspace_dir);
// ── Provider ────────────────────────────────────────────────────
let provider: Arc<dyn provider::Provider> =
Arc::new(AnthropicProvider::new(&config.anthropic));
// ── API session store (sessions/api/) ───────────────────────────
let api_session_store = Arc::new(SessionStore::with_workspace(
sessions_base.join("api"),
Arc::clone(&ws_state),
));
if config.standby_mode {
tracing::info!(
"Standby mode enabled: git sync only, skipping channel and heartbeat"
);
}
// Captured below so main can await the agent's graceful shutdown
// (summarize_on_shutdown) before returning. Without this, the
// tokio runtime drops the spawned task the moment serve::run
// returns, cancelling any in-flight LLM call (#48).
let mut agent_handle: Option<tokio::task::JoinHandle<()>> = None;
// ── Channel + Agent (Matrix or Discord, if configured) ──────────
if !config.standby_mode && (config.matrix.is_some() || config.discord.is_some()) {
let channel_name = if config.discord.is_some() {
"discord"
} else {
"matrix"
};
let channel_session_store = Arc::new(SessionStore::with_workspace(
sessions_base.join(channel_name),
Arc::clone(&ws_state),
));
let channel: Arc<dyn channel::Channel> = if let Some(d) = &config.discord {
Arc::new(
DiscordChannel::new(d).context("Failed to initialise Discord channel")?,
)
} else if let Some(m) = &config.matrix {
Arc::new(MatrixChannel::new(m))
} else {
unreachable!()
};
// ── Catch up on any pending daily logs ──────────────────────
catchup_pending_logs(
&channel_session_store,
provider.as_ref(),
&ws_state,
&workspace_dir,
config.day_boundary_hour,
)
.await;
// ── Agent ───────────────────────────────────────────────────
let agent = Arc::new(Agent::new(
config.clone(),
channel,
Arc::clone(&provider),
Arc::clone(&workspace),
Some(Arc::clone(&tool_set)),
Arc::clone(&channel_session_store),
));
agent.bootstrap().await;
// ── Heartbeat (day-boundary + cron loops) ───────────────────
let default_room_id = config
.matrix
.as_ref()
.and_then(|m| m.primary_room_id().map(str::to_string))
.or_else(|| {
config
.discord
.as_ref()
.and_then(|d| d.channel_ids.first().cloned())
});
let heartbeat = Heartbeat {
workspace_dir: workspace_dir.clone(),
ws_state: Arc::clone(&ws_state),
day_boundary_hour: config.day_boundary_hour,
daily_log_enabled: config.daily_log_enabled,
memory_compaction_enabled: config.memory_compaction_enabled,
session_store: Arc::clone(&channel_session_store),
provider: Arc::clone(&provider),
agent: Arc::clone(&agent),
default_room_id,
};
if config.heartbeat_enabled {
heartbeat.spawn();
} else {
tracing::info!("Heartbeat disabled by config");
}
let agent_run = Arc::clone(&agent);
agent_handle = Some(tokio::spawn(async move {
if let Err(e) = agent_run.run().await {
tracing::error!("Agent error: {e:#}");
}
}));
}
if config.standby_mode {
// In standby mode, keep the process alive for periodic git
// sync only — no HTTP server, no channel, no heartbeat.
tracing::info!("Standby mode: waiting for shutdown signal (Ctrl-C)");
tokio::signal::ctrl_c()
.await
.expect("Failed to listen for Ctrl-C");
tracing::info!("Shutting down standby process");
} else {
// ── HTTP API server ─────────────────────────────────────────
let addr = bind
.or_else(|| {
config
.serve
.as_ref()
.map(|s| format!("{}:{}", s.host, s.port))
})
.unwrap_or_else(|| "127.0.0.1:9000".to_string());
serve::run(
addr,
config,
provider,
workspace,
tool_set,
api_session_store,
)
.await?;
}
// Wait for the agent task's graceful shutdown to finish so its
// summarize_on_shutdown LLM call isn't aborted by runtime drop.
if let Some(handle) = agent_handle {
if let Err(e) = handle.await {
tracing::warn!("Agent task did not finish cleanly: {e}");
}
}
}
Command::Call { .. } => unreachable!(),
}
Ok(())
}