mod state;
use anyhow::{Context, Result, anyhow};
use async_trait::async_trait;
use chrono::Utc;
use f8s_core::{
AgentKeypair, ApiError, ApproveJoinRequest, ArtifactManifest, CreateThreadRequest,
CreateThreadResponse, FetchMessagesResponse, Invite, JoinThreadRequest, JoinThreadResponse,
LocalMailboxMessage, MailboxState, MessageKind, PendingJoin, SendMessageRequest,
SendMessageResponse, ThreadId, ThreadSecret, build_signed_envelope, decrypt_from_agent,
encrypt_for_agent, encrypt_for_thread, new_thread_secret, sha256_b64, verify_envelope,
};
use incurs::cli::Cli;
use incurs::command::{CommandContext, CommandDef, CommandHandler};
use incurs::output::CommandResult;
use incurs::schema::{FieldMeta, FieldType};
use serde_json::{Value, json};
use state::{ClientState, StoredThread};
use uuid::Uuid;
const DEFAULT_SERVER: &str = "http://127.0.0.1:8787";
#[tokio::main]
async fn main() {
let cli = Cli::create("f8s")
.description("Secure mailbox threads for local agents")
.version(env!("CARGO_PKG_VERSION"))
.group(identity_group())
.group(thread_group())
.group(join_group())
.command(
"peers",
cmd("peers", "List approved peers", Handler::Peers)
.args(vec![field("thread", "Thread id", true)])
.options(server_options())
.done(),
)
.command(
"send",
cmd("send", "Post an encrypted message", Handler::Send)
.args(vec![field("thread", "Thread id", true)])
.options(vec![
opt("text", "Text body"),
opt("image", "Image/file path"),
server_opt(),
])
.done(),
)
.group(mailbox_group())
.command(
"bridge",
cmd(
"bridge",
"Print released messages as guarded agent input",
Handler::Bridge,
)
.args(vec![field("thread", "Thread id", true)])
.options(vec![
opt("provider", "codex or claude"),
opt("mode", "Only guarded is supported"),
])
.done(),
);
if let Err(err) = cli.serve().await {
eprintln!("{err}");
std::process::exit(1);
}
}
fn identity_group() -> Cli {
Cli::create("identity")
.description("Local identity commands")
.command(
"init",
cmd(
"init",
"Create or replace a local agent identity",
Handler::IdentityInit,
)
.options(vec![opt("as", "Agent handle")])
.done(),
)
.command(
"show",
cmd(
"show",
"Show the local public identity",
Handler::IdentityShow,
)
.done(),
)
}
fn thread_group() -> Cli {
Cli::create("thread")
.description("Thread commands")
.command(
"create",
cmd(
"create",
"Create a thread and print an invite once",
Handler::ThreadCreate,
)
.options(vec![opt("as", "Agent handle"), server_opt()])
.done(),
)
.command(
"join",
cmd(
"join",
"Request to join a thread from an invite",
Handler::ThreadJoin,
)
.args(vec![field("invite", "f8s invite", true)])
.options(vec![opt("as", "Agent handle"), server_opt()])
.done(),
)
}
fn join_group() -> Cli {
Cli::create("join")
.description("Join approval commands")
.command(
"approve",
cmd("approve", "Approve a pending join", Handler::JoinApprove)
.args(vec![
field("thread", "Thread id", true),
field("join_id", "Join id", true),
])
.options(server_options())
.done(),
)
}
fn mailbox_group() -> Cli {
Cli::create("mailbox")
.description("Local quarantine mailbox commands")
.command(
"fetch",
cmd(
"fetch",
"Fetch encrypted envelopes into local quarantine",
Handler::MailboxFetch,
)
.args(vec![field("thread", "Thread id", true)])
.options(server_options())
.done(),
)
.command(
"inspect",
cmd(
"inspect",
"Verify, decrypt, and inspect a fetched message",
Handler::MailboxInspect,
)
.args(vec![
field("thread", "Thread id", true),
field("message_id", "Envelope id", true),
])
.done(),
)
.command(
"release",
cmd(
"release",
"Release an inspected message to guarded bridge input",
Handler::MailboxRelease,
)
.args(vec![
field("thread", "Thread id", true),
field("message_id", "Envelope id", true),
])
.done(),
)
.command(
"quarantine",
cmd(
"quarantine",
"Block a message from agent/tool use",
Handler::MailboxQuarantine,
)
.args(vec![
field("thread", "Thread id", true),
field("message_id", "Envelope id", true),
])
.done(),
)
}
fn cmd(name: &'static str, description: &'static str, handler: Handler) -> CommandBuilderLite {
CommandBuilderLite {
name,
description,
handler,
args: vec![],
options: vec![],
}
}
struct CommandBuilderLite {
name: &'static str,
description: &'static str,
handler: Handler,
args: Vec<FieldMeta>,
options: Vec<FieldMeta>,
}
impl CommandBuilderLite {
fn args(mut self, args: Vec<FieldMeta>) -> Self {
self.args = args;
self
}
fn options(mut self, options: Vec<FieldMeta>) -> Self {
self.options = options;
self
}
fn done(self) -> CommandDef {
CommandDef::build(self.name, self.handler)
.description(self.description)
.done_with_fields(self.args, self.options)
}
}
trait DoneWithFields {
fn done_with_fields(self, args: Vec<FieldMeta>, options: Vec<FieldMeta>) -> CommandDef;
}
impl DoneWithFields for incurs::command::CommandBuilder {
fn done_with_fields(self, args: Vec<FieldMeta>, options: Vec<FieldMeta>) -> CommandDef {
let mut def = self.done();
def.args_fields = args;
def.options_fields = options;
def
}
}
#[derive(Clone, Copy)]
enum Handler {
IdentityInit,
IdentityShow,
ThreadCreate,
ThreadJoin,
JoinApprove,
Peers,
Send,
MailboxFetch,
MailboxInspect,
MailboxRelease,
MailboxQuarantine,
Bridge,
}
#[async_trait]
impl CommandHandler for Handler {
async fn run(&self, ctx: CommandContext) -> CommandResult {
match run_handler(*self, ctx).await {
Ok(value) => CommandResult::Ok {
data: value,
cta: None,
},
Err(err) => CommandResult::Error {
code: "F8S_ERROR".to_string(),
message: err.to_string(),
retryable: false,
exit_code: Some(1),
cta: None,
},
}
}
}
async fn run_handler(handler: Handler, ctx: CommandContext) -> Result<Value> {
match handler {
Handler::IdentityInit => identity_init(ctx).await,
Handler::IdentityShow => identity_show().await,
Handler::ThreadCreate => thread_create(ctx).await,
Handler::ThreadJoin => thread_join(ctx).await,
Handler::JoinApprove => join_approve(ctx).await,
Handler::Peers => peers(ctx).await,
Handler::Send => send(ctx).await,
Handler::MailboxFetch => mailbox_fetch(ctx).await,
Handler::MailboxInspect => mailbox_inspect(ctx).await,
Handler::MailboxRelease => mailbox_release(ctx).await,
Handler::MailboxQuarantine => mailbox_quarantine(ctx).await,
Handler::Bridge => bridge(ctx).await,
}
}
async fn identity_init(ctx: CommandContext) -> Result<Value> {
let handle = opt_str(&ctx, "as").unwrap_or("agent").to_string();
let keypair = AgentKeypair::generate(handle);
let mut state = ClientState::load().await?;
state.identity = Some(keypair.export);
state.save().await?;
Ok(json!({ "identity": state.identity_public()? }))
}
async fn identity_show() -> Result<Value> {
let state = ClientState::load().await?;
Ok(json!({ "identity": state.identity_public()? }))
}
async fn thread_create(ctx: CommandContext) -> Result<Value> {
let mut state = ClientState::load().await?;
if state.identity.is_none() {
let handle = opt_str(&ctx, "as").unwrap_or("agent").to_string();
state.identity = Some(AgentKeypair::generate(handle).export);
}
let keypair = state.keypair()?;
let thread_id = ThreadId::new();
let invite = Invite::new(thread_id.clone());
let thread_secret = new_thread_secret(thread_id.clone());
let request = CreateThreadRequest {
thread_id: thread_id.clone(),
invite_verifier: invite.verifier(),
creator: keypair.identity(),
creator_signature: keypair.sign_json(&thread_id.0)?,
};
let response: CreateThreadResponse = post_json(&ctx, "/v1/threads", &request).await?;
state.threads.insert(
thread_id.0.clone(),
StoredThread {
thread_id: thread_id.clone(),
secret: Some(thread_secret),
last_seq: 0,
},
);
state.save().await?;
Ok(json!({
"thread_id": response.thread_id,
"invite": invite.expose(),
"first_member": response.first_member,
"warning": "invite is shown once; send it out of band"
}))
}
async fn thread_join(ctx: CommandContext) -> Result<Value> {
let invite = Invite::parse(arg_str(&ctx, "invite")?)?;
let mut state = ClientState::load().await?;
if state.identity.is_none() {
let handle = opt_str(&ctx, "as").unwrap_or("agent").to_string();
state.identity = Some(AgentKeypair::generate(handle).export);
}
let keypair = state.keypair()?;
let request = JoinThreadRequest {
thread_id: invite.thread_id.clone(),
invite_secret: invite.secret,
agent: keypair.identity(),
agent_signature: keypair.sign_json(&keypair.identity())?,
};
let response: JoinThreadResponse = post_json(
&ctx,
&format!("/v1/threads/{}/join", invite.thread_id.0),
&request,
)
.await?;
state
.threads
.entry(invite.thread_id.0.clone())
.or_insert(StoredThread {
thread_id: invite.thread_id,
secret: None,
last_seq: 0,
});
state.save().await?;
Ok(json!(response))
}
async fn join_approve(ctx: CommandContext) -> Result<Value> {
let thread = arg_str(&ctx, "thread")?;
let join_id = Uuid::parse_str(arg_str(&ctx, "join_id")?)?;
let state = ClientState::load().await?;
let keypair = state.keypair()?;
let secret = state.thread_secret(thread)?;
let joins: Vec<PendingJoin> = get_json(&ctx, &format!("/v1/threads/{thread}/joins")).await?;
let join = joins
.into_iter()
.find(|join| join.join_id == join_id)
.ok_or_else(|| anyhow!("join not found: {join_id}"))?;
let welcome_json = serde_json::to_vec(secret)?;
let (ciphertext, nonce) =
encrypt_for_agent(&keypair, &join.agent.encryption_public_key, &welcome_json)?;
let welcome = build_signed_envelope(
&keypair,
ThreadId(thread.to_string()),
secret.epoch,
MessageKind::Welcome,
ciphertext,
nonce,
None,
)?;
let request = ApproveJoinRequest {
approver: keypair.identity(),
welcome,
};
let value: Value = post_json(
&ctx,
&format!("/v1/threads/{thread}/joins/{join_id}/approve"),
&request,
)
.await?;
Ok(value)
}
async fn peers(ctx: CommandContext) -> Result<Value> {
let thread = arg_str(&ctx, "thread")?;
get_json(&ctx, &format!("/v1/threads/{thread}/peers")).await
}
async fn send(ctx: CommandContext) -> Result<Value> {
let thread = arg_str(&ctx, "thread")?;
let text = if let Some(text) = opt_str(&ctx, "text") {
text.to_string()
} else if let Some(path) = opt_str(&ctx, "image") {
let bytes = tokio::fs::read(path)
.await
.with_context(|| format!("read {path}"))?;
format!(
"[encrypted artifact placeholder] path={} sha256={} bytes={}",
path,
sha256_b64(&bytes),
bytes.len()
)
} else {
return Err(anyhow!("send requires --text or --image"));
};
let state = ClientState::load().await?;
let keypair = state.keypair()?;
let secret = state.thread_secret(thread)?;
let (ciphertext, nonce) = encrypt_for_thread(&secret.thread_key, text.as_bytes())?;
let envelope = build_signed_envelope(
&keypair,
ThreadId(thread.to_string()),
secret.epoch,
MessageKind::Text,
ciphertext,
nonce,
None::<ArtifactManifest>,
)?;
let response: SendMessageResponse = post_json(
&ctx,
&format!("/v1/threads/{thread}/messages"),
&SendMessageRequest { envelope },
)
.await?;
Ok(json!(response))
}
async fn mailbox_fetch(ctx: CommandContext) -> Result<Value> {
let thread = arg_str(&ctx, "thread")?;
let mut state = ClientState::load().await?;
let after = state.threads.get(thread).map(|t| t.last_seq).unwrap_or(0);
let response: FetchMessagesResponse = get_json(
&ctx,
&format!("/v1/threads/{thread}/messages?after={after}"),
)
.await?;
let mut fetched = 0_u64;
for envelope in response.messages {
if let Some(seq) = envelope.seq {
state
.threads
.entry(thread.to_string())
.or_insert(StoredThread {
thread_id: ThreadId(thread.to_string()),
secret: None,
last_seq: 0,
})
.last_seq = seq;
}
state.mailbox.insert(
envelope.envelope_id.to_string(),
LocalMailboxMessage::fetched(envelope),
);
fetched += 1;
}
state.save().await?;
Ok(json!({ "thread_id": thread, "fetched": fetched }))
}
async fn mailbox_inspect(ctx: CommandContext) -> Result<Value> {
let thread = arg_str(&ctx, "thread")?;
let message_id = arg_str(&ctx, "message_id")?;
let mut state = ClientState::load().await?;
let msg = state
.mailbox
.remove(message_id)
.ok_or_else(|| anyhow!("message not found: {message_id}"))?;
let inspected =
if msg.envelope.kind == MessageKind::Welcome && state.thread_secret(thread).is_err() {
verify_envelope(&msg.envelope)?;
let keypair = state.keypair()?;
let bytes = decrypt_from_agent(
&keypair,
&msg.envelope.sender.encryption_public_key,
&msg.envelope.nonce,
&msg.envelope.ciphertext,
)?;
let welcome_secret: ThreadSecret = serde_json::from_slice(&bytes)?;
state.threads.insert(
thread.to_string(),
StoredThread {
thread_id: ThreadId(thread.to_string()),
secret: Some(welcome_secret),
last_seq: msg.envelope.seq.unwrap_or(0),
},
);
msg.verify()?
.quarantine("welcome envelope imported thread key; plaintext not released")?
} else {
let secret = state.thread_secret(thread)?.clone();
msg.inspect(&secret)?
};
let decision = inspected.inspection.clone();
state.mailbox.insert(message_id.to_string(), inspected);
state.save().await?;
Ok(json!({ "message_id": message_id, "inspection": decision }))
}
async fn mailbox_release(ctx: CommandContext) -> Result<Value> {
transition_message(ctx, MailboxState::Released).await
}
async fn mailbox_quarantine(ctx: CommandContext) -> Result<Value> {
transition_message(ctx, MailboxState::Quarantined).await
}
async fn transition_message(ctx: CommandContext, to: MailboxState) -> Result<Value> {
let message_id = arg_str(&ctx, "message_id")?;
let mut state = ClientState::load().await?;
let msg = state
.mailbox
.remove(message_id)
.ok_or_else(|| anyhow!("message not found: {message_id}"))?;
let transitioned = match to {
MailboxState::Released => msg.release()?,
MailboxState::Quarantined => msg.quarantine("manual quarantine")?,
_ => return Err(anyhow!("unsupported transition")),
};
let state_name = transitioned.state.clone();
state.mailbox.insert(message_id.to_string(), transitioned);
state.save().await?;
Ok(json!({ "message_id": message_id, "state": state_name }))
}
async fn bridge(ctx: CommandContext) -> Result<Value> {
let thread = arg_str(&ctx, "thread")?;
let state = ClientState::load().await?;
let messages: Vec<Value> = state
.mailbox
.values()
.filter(|msg| msg.envelope.thread_id.0 == thread && msg.state == MailboxState::Released)
.map(|msg| {
json!({
"from": msg.envelope.sender.handle,
"seq": msg.envelope.seq,
"content": msg.plaintext,
"trust": "remote_untrusted_user_content"
})
})
.collect();
Ok(json!({
"provider": opt_str(&ctx, "provider").unwrap_or("codex"),
"mode": opt_str(&ctx, "mode").unwrap_or("guarded"),
"messages": messages
}))
}
async fn get_json<T: serde::de::DeserializeOwned>(ctx: &CommandContext, path: &str) -> Result<T> {
let url = format!("{}{}", server(ctx), path);
let builder = reqwest::Client::new().get(url);
let builder = sign_request(builder, "GET", path, b"").await?;
let response = builder.send().await?;
parse_response(response).await
}
async fn post_json<T: serde::Serialize, R: serde::de::DeserializeOwned>(
ctx: &CommandContext,
path: &str,
body: &T,
) -> Result<R> {
let url = format!("{}{}", server(ctx), path);
let bytes = serde_json::to_vec(body)?;
let builder = reqwest::Client::new()
.post(url)
.header("content-type", "application/json")
.body(bytes.clone());
let builder = sign_request(builder, "POST", path, &bytes).await?;
let response = builder.send().await?;
parse_response(response).await
}
async fn sign_request(
builder: reqwest::RequestBuilder,
method: &str,
path: &str,
body: &[u8],
) -> Result<reqwest::RequestBuilder> {
let state = ClientState::load().await?;
let Ok(keypair) = state.keypair() else {
return Ok(builder);
};
let timestamp = Utc::now().timestamp().to_string();
let nonce = Uuid::new_v4().to_string();
let canonical_path = path.split('?').next().unwrap_or(path);
let payload = format!(
"{}\n{}\n{}\n{}\n{}",
method,
canonical_path,
sha256_b64(body),
timestamp,
nonce
);
let signature = keypair.sign_bytes(payload.as_bytes())?;
Ok(builder
.header("x-f8s-agent-id", keypair.identity().agent_id)
.header("x-f8s-timestamp", timestamp)
.header("x-f8s-nonce", nonce)
.header("x-f8s-signature", signature))
}
async fn parse_response<T: serde::de::DeserializeOwned>(response: reqwest::Response) -> Result<T> {
let status = response.status();
if !status.is_success() {
let api_error = response.json::<ApiError>().await.unwrap_or(ApiError {
code: "HTTP_ERROR".to_string(),
message: status.to_string(),
});
return Err(anyhow!("{}: {}", api_error.code, api_error.message));
}
Ok(response.json().await?)
}
fn server(ctx: &CommandContext) -> String {
opt_str(ctx, "server")
.map(ToString::to_string)
.or_else(|| std::env::var("F8S_SERVER").ok())
.unwrap_or_else(|| DEFAULT_SERVER.to_string())
}
fn arg_str<'a>(ctx: &'a CommandContext, name: &str) -> Result<&'a str> {
ctx.args
.get(name)
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("missing argument: {name}"))
}
fn opt_str<'a>(ctx: &'a CommandContext, name: &str) -> Option<&'a str> {
ctx.options.get(name).and_then(Value::as_str)
}
fn field(name: &'static str, description: &'static str, required: bool) -> FieldMeta {
FieldMeta {
name,
cli_name: name.replace('_', "-"),
description: Some(description),
field_type: FieldType::String,
required,
default: None,
alias: None,
deprecated: false,
env_name: None,
}
}
fn opt(name: &'static str, description: &'static str) -> FieldMeta {
field(name, description, false)
}
fn server_opt() -> FieldMeta {
opt("server", "Worker base URL")
}
fn server_options() -> Vec<FieldMeta> {
vec![server_opt()]
}