mod state;
use anyhow::{Context, Result, anyhow};
use async_trait::async_trait;
use chrono::Utc;
use f8s_core::{
AgentKeypair, ApiError, ApproveJoinRequest, ArtifactManifest, CreateThreadRequest,
CreateThreadResponse, FetchMessagesResponse, Invite, JoinThreadRequest, JoinThreadResponse,
LocalMailboxMessage, MailboxState, MessageKind, PendingJoin, SendMessageRequest,
SendMessageResponse, ThreadId, ThreadSecret, build_signed_envelope, decrypt_from_agent,
encrypt_for_agent, encrypt_for_thread, new_thread_secret, sha256_b64, verify_envelope,
};
use incurs::cli::Cli;
use incurs::command::{CommandContext, CommandDef, CommandHandler};
use incurs::output::CommandResult;
use incurs::schema::{FieldMeta, FieldType};
use serde_json::{Value, json};
use state::{ClientState, StoredThread};
use uuid::Uuid;
const DEFAULT_SERVER: &str = "http://127.0.0.1:8787";
#[tokio::main]
async fn main() {
let cli = Cli::create("f8s")
.description("Secure mailbox threads for local agents. Fetch remote messages into quarantine, inspect them locally, then release only safe messages to the agent bridge")
.version(env!("CARGO_PKG_VERSION"))
.group(identity_group())
.group(thread_group())
.group(join_group())
.command(
"peers",
cmd("peers", "List approved peers and pending join requests for a thread", Handler::Peers)
.args(vec![field("thread", "Thread id", true)])
.options(server_options())
.done(),
)
.command(
"send",
cmd("send", "Post an encrypted message or image manifest to approved peers in a thread. Plaintext is encrypted locally before upload", Handler::Send)
.args(vec![field("thread", "Thread id", true)])
.options(vec![
opt("text", "Text body"),
opt("image", "Image/file path"),
server_opt(),
])
.done(),
)
.group(mailbox_group())
.command(
"bridge",
cmd(
"bridge",
"Print only released mailbox messages as guarded, untrusted agent input. Fetched, inspected, and quarantined messages are not bridged",
Handler::Bridge,
)
.args(vec![field("thread", "Thread id", true)])
.options(vec![
opt("provider", "codex or claude"),
opt("mode", "Only guarded is supported"),
])
.done(),
);
if let Err(err) = cli.serve().await {
eprintln!("{err}");
std::process::exit(1);
}
}
fn identity_group() -> Cli {
Cli::create("identity")
.description(
"Manage the local agent identity used to sign joins, approvals, and mailbox messages",
)
.command(
"init",
cmd(
"init",
"Create or replace the local signing and encryption identity for this agent",
Handler::IdentityInit,
)
.options(vec![opt("as", "Agent handle")])
.done(),
)
.command(
"show",
cmd(
"show",
"Show the local public identity and handle for sharing or verification",
Handler::IdentityShow,
)
.done(),
)
}
fn thread_group() -> Cli {
Cli::create("thread")
.description("Create or join secure mailbox threads. The creator shares the invite out of band and must approve join requests before peers can communicate")
.command(
"create",
cmd(
"create",
"Create a new unguessable thread and print a one-time invite to share out of band",
Handler::ThreadCreate,
)
.options(vec![opt("as", "Agent handle"), server_opt()])
.done(),
)
.command(
"join",
cmd(
"join",
"Request to join a thread from an invite. The thread creator must approve the request before communication",
Handler::ThreadJoin,
)
.args(vec![field("invite", "f8s invite", true)])
.options(vec![opt("as", "Agent handle"), server_opt()])
.done(),
)
}
fn join_group() -> Cli {
Cli::create("join")
.description("Approve pending join requests after the user confirms the peer identity")
.command(
"approve",
cmd(
"approve",
"Approve a pending join and broadcast the encrypted thread secret to that peer",
Handler::JoinApprove,
)
.args(vec![
field("thread", "Thread id", true),
field("join_id", "Join id", true),
])
.options(server_options())
.done(),
)
}
fn mailbox_group() -> Cli {
Cli::create("mailbox")
.description("Local quarantine workflow. Fetch remote envelopes, inspect and decrypt locally, then release only messages that should be visible to an agent")
.command(
"fetch",
cmd(
"fetch",
"Fetch encrypted envelopes into local quarantine without exposing their plaintext to the agent",
Handler::MailboxFetch,
)
.args(vec![field("thread", "Thread id", true)])
.options(server_options())
.done(),
)
.command(
"inspect",
cmd(
"inspect",
"Verify signatures, decrypt locally, and inspect a fetched message for prompt-injection risk",
Handler::MailboxInspect,
)
.args(vec![
field("thread", "Thread id", true),
field("message_id", "Envelope id", true),
])
.done(),
)
.command(
"release",
cmd(
"release",
"Release a message only after inspection allows it so it can appear in guarded bridge output",
Handler::MailboxRelease,
)
.args(vec![
field("thread", "Thread id", true),
field("message_id", "Envelope id", true),
])
.done(),
)
.command(
"quarantine",
cmd(
"quarantine",
"Block a message from agent or tool use when inspection or user review rejects it",
Handler::MailboxQuarantine,
)
.args(vec![
field("thread", "Thread id", true),
field("message_id", "Envelope id", true),
])
.done(),
)
}
fn cmd(name: &'static str, description: &'static str, handler: Handler) -> CommandBuilderLite {
CommandBuilderLite {
name,
description,
handler,
args: vec![],
options: vec![],
}
}
struct CommandBuilderLite {
name: &'static str,
description: &'static str,
handler: Handler,
args: Vec<FieldMeta>,
options: Vec<FieldMeta>,
}
impl CommandBuilderLite {
fn args(mut self, args: Vec<FieldMeta>) -> Self {
self.args = args;
self
}
fn options(mut self, options: Vec<FieldMeta>) -> Self {
self.options = options;
self
}
fn done(self) -> CommandDef {
CommandDef::build(self.name, self.handler)
.description(self.description)
.done_with_fields(self.args, self.options)
}
}
trait DoneWithFields {
fn done_with_fields(self, args: Vec<FieldMeta>, options: Vec<FieldMeta>) -> CommandDef;
}
impl DoneWithFields for incurs::command::CommandBuilder {
fn done_with_fields(self, args: Vec<FieldMeta>, options: Vec<FieldMeta>) -> CommandDef {
let mut def = self.done();
def.args_fields = args;
def.options_fields = options;
def
}
}
#[derive(Clone, Copy)]
enum Handler {
IdentityInit,
IdentityShow,
ThreadCreate,
ThreadJoin,
JoinApprove,
Peers,
Send,
MailboxFetch,
MailboxInspect,
MailboxRelease,
MailboxQuarantine,
Bridge,
}
#[async_trait]
impl CommandHandler for Handler {
async fn run(&self, ctx: CommandContext) -> CommandResult {
match run_handler(*self, ctx).await {
Ok(value) => CommandResult::Ok {
data: value,
cta: None,
},
Err(err) => CommandResult::Error {
code: "F8S_ERROR".to_string(),
message: err.to_string(),
retryable: false,
exit_code: Some(1),
cta: None,
},
}
}
}
async fn run_handler(handler: Handler, ctx: CommandContext) -> Result<Value> {
match handler {
Handler::IdentityInit => identity_init(ctx).await,
Handler::IdentityShow => identity_show().await,
Handler::ThreadCreate => thread_create(ctx).await,
Handler::ThreadJoin => thread_join(ctx).await,
Handler::JoinApprove => join_approve(ctx).await,
Handler::Peers => peers(ctx).await,
Handler::Send => send(ctx).await,
Handler::MailboxFetch => mailbox_fetch(ctx).await,
Handler::MailboxInspect => mailbox_inspect(ctx).await,
Handler::MailboxRelease => mailbox_release(ctx).await,
Handler::MailboxQuarantine => mailbox_quarantine(ctx).await,
Handler::Bridge => bridge(ctx).await,
}
}
async fn identity_init(ctx: CommandContext) -> Result<Value> {
let handle = opt_str(&ctx, "as").unwrap_or("agent").to_string();
let keypair = AgentKeypair::generate(handle);
let mut state = ClientState::load().await?;
state.identity = Some(keypair.export);
state.save().await?;
Ok(json!({ "identity": state.identity_public()? }))
}
async fn identity_show() -> Result<Value> {
let state = ClientState::load().await?;
Ok(json!({ "identity": state.identity_public()? }))
}
async fn thread_create(ctx: CommandContext) -> Result<Value> {
let mut state = ClientState::load().await?;
if state.identity.is_none() {
let handle = opt_str(&ctx, "as").unwrap_or("agent").to_string();
state.identity = Some(AgentKeypair::generate(handle).export);
}
let keypair = state.keypair()?;
let thread_id = ThreadId::new();
let invite = Invite::new(thread_id.clone());
let thread_secret = new_thread_secret(thread_id.clone());
let request = CreateThreadRequest {
thread_id: thread_id.clone(),
invite_verifier: invite.verifier(),
creator: keypair.identity(),
creator_signature: keypair.sign_json(&thread_id.0)?,
};
let response: CreateThreadResponse = post_json(&ctx, "/v1/threads", &request).await?;
state.threads.insert(
thread_id.0.clone(),
StoredThread {
thread_id: thread_id.clone(),
secret: Some(thread_secret),
last_seq: 0,
},
);
state.save().await?;
Ok(json!({
"thread_id": response.thread_id,
"invite": invite.expose(),
"first_member": response.first_member,
"warning": "invite is shown once; send it out of band"
}))
}
async fn thread_join(ctx: CommandContext) -> Result<Value> {
let invite = Invite::parse(arg_str(&ctx, "invite")?)?;
let mut state = ClientState::load().await?;
if state.identity.is_none() {
let handle = opt_str(&ctx, "as").unwrap_or("agent").to_string();
state.identity = Some(AgentKeypair::generate(handle).export);
}
let keypair = state.keypair()?;
let request = JoinThreadRequest {
thread_id: invite.thread_id.clone(),
invite_secret: invite.secret,
agent: keypair.identity(),
agent_signature: keypair.sign_json(&keypair.identity())?,
};
let response: JoinThreadResponse = post_json(
&ctx,
&format!("/v1/threads/{}/join", invite.thread_id.0),
&request,
)
.await?;
state
.threads
.entry(invite.thread_id.0.clone())
.or_insert(StoredThread {
thread_id: invite.thread_id,
secret: None,
last_seq: 0,
});
state.save().await?;
Ok(json!(response))
}
async fn join_approve(ctx: CommandContext) -> Result<Value> {
let thread = arg_str(&ctx, "thread")?;
let join_id = Uuid::parse_str(arg_str(&ctx, "join_id")?)?;
let state = ClientState::load().await?;
let keypair = state.keypair()?;
let secret = state.thread_secret(thread)?;
let joins: Vec<PendingJoin> = get_json(&ctx, &format!("/v1/threads/{thread}/joins")).await?;
let join = joins
.into_iter()
.find(|join| join.join_id == join_id)
.ok_or_else(|| anyhow!("join not found: {join_id}"))?;
let welcome_json = serde_json::to_vec(secret)?;
let (ciphertext, nonce) =
encrypt_for_agent(&keypair, &join.agent.encryption_public_key, &welcome_json)?;
let welcome = build_signed_envelope(
&keypair,
ThreadId(thread.to_string()),
secret.epoch,
MessageKind::Welcome,
ciphertext,
nonce,
None,
)?;
let request = ApproveJoinRequest {
approver: keypair.identity(),
welcome,
};
let value: Value = post_json(
&ctx,
&format!("/v1/threads/{thread}/joins/{join_id}/approve"),
&request,
)
.await?;
Ok(value)
}
async fn peers(ctx: CommandContext) -> Result<Value> {
let thread = arg_str(&ctx, "thread")?;
get_json(&ctx, &format!("/v1/threads/{thread}/peers")).await
}
async fn send(ctx: CommandContext) -> Result<Value> {
let thread = arg_str(&ctx, "thread")?;
let text = if let Some(text) = opt_str(&ctx, "text") {
text.to_string()
} else if let Some(path) = opt_str(&ctx, "image") {
let bytes = tokio::fs::read(path)
.await
.with_context(|| format!("read {path}"))?;
format!(
"[encrypted artifact placeholder] path={} sha256={} bytes={}",
path,
sha256_b64(&bytes),
bytes.len()
)
} else {
return Err(anyhow!("send requires --text or --image"));
};
let state = ClientState::load().await?;
let keypair = state.keypair()?;
let secret = state.thread_secret(thread)?;
let (ciphertext, nonce) = encrypt_for_thread(&secret.thread_key, text.as_bytes())?;
let envelope = build_signed_envelope(
&keypair,
ThreadId(thread.to_string()),
secret.epoch,
MessageKind::Text,
ciphertext,
nonce,
None::<ArtifactManifest>,
)?;
let response: SendMessageResponse = post_json(
&ctx,
&format!("/v1/threads/{thread}/messages"),
&SendMessageRequest { envelope },
)
.await?;
Ok(json!(response))
}
async fn mailbox_fetch(ctx: CommandContext) -> Result<Value> {
let thread = arg_str(&ctx, "thread")?;
let mut state = ClientState::load().await?;
let after = state.threads.get(thread).map(|t| t.last_seq).unwrap_or(0);
let response: FetchMessagesResponse = get_json(
&ctx,
&format!("/v1/threads/{thread}/messages?after={after}"),
)
.await?;
let mut fetched = 0_u64;
for envelope in response.messages {
if let Some(seq) = envelope.seq {
state
.threads
.entry(thread.to_string())
.or_insert(StoredThread {
thread_id: ThreadId(thread.to_string()),
secret: None,
last_seq: 0,
})
.last_seq = seq;
}
state.mailbox.insert(
envelope.envelope_id.to_string(),
LocalMailboxMessage::fetched(envelope),
);
fetched += 1;
}
state.save().await?;
Ok(json!({ "thread_id": thread, "fetched": fetched }))
}
async fn mailbox_inspect(ctx: CommandContext) -> Result<Value> {
let thread = arg_str(&ctx, "thread")?;
let message_id = arg_str(&ctx, "message_id")?;
let mut state = ClientState::load().await?;
let msg = state
.mailbox
.remove(message_id)
.ok_or_else(|| anyhow!("message not found: {message_id}"))?;
let inspected =
if msg.envelope.kind == MessageKind::Welcome && state.thread_secret(thread).is_err() {
verify_envelope(&msg.envelope)?;
let keypair = state.keypair()?;
let bytes = decrypt_from_agent(
&keypair,
&msg.envelope.sender.encryption_public_key,
&msg.envelope.nonce,
&msg.envelope.ciphertext,
)?;
let welcome_secret: ThreadSecret = serde_json::from_slice(&bytes)?;
state.threads.insert(
thread.to_string(),
StoredThread {
thread_id: ThreadId(thread.to_string()),
secret: Some(welcome_secret),
last_seq: msg.envelope.seq.unwrap_or(0),
},
);
msg.verify()?
.quarantine("welcome envelope imported thread key; plaintext not released")?
} else {
let secret = state.thread_secret(thread)?.clone();
msg.inspect(&secret)?
};
let decision = inspected.inspection.clone();
state.mailbox.insert(message_id.to_string(), inspected);
state.save().await?;
Ok(json!({ "message_id": message_id, "inspection": decision }))
}
async fn mailbox_release(ctx: CommandContext) -> Result<Value> {
transition_message(ctx, MailboxState::Released).await
}
async fn mailbox_quarantine(ctx: CommandContext) -> Result<Value> {
transition_message(ctx, MailboxState::Quarantined).await
}
async fn transition_message(ctx: CommandContext, to: MailboxState) -> Result<Value> {
let message_id = arg_str(&ctx, "message_id")?;
let mut state = ClientState::load().await?;
let msg = state
.mailbox
.remove(message_id)
.ok_or_else(|| anyhow!("message not found: {message_id}"))?;
let transitioned = match to {
MailboxState::Released => msg.release()?,
MailboxState::Quarantined => msg.quarantine("manual quarantine")?,
_ => return Err(anyhow!("unsupported transition")),
};
let state_name = transitioned.state.clone();
state.mailbox.insert(message_id.to_string(), transitioned);
state.save().await?;
Ok(json!({ "message_id": message_id, "state": state_name }))
}
async fn bridge(ctx: CommandContext) -> Result<Value> {
let thread = arg_str(&ctx, "thread")?;
let state = ClientState::load().await?;
let messages: Vec<Value> = state
.mailbox
.values()
.filter(|msg| msg.envelope.thread_id.0 == thread && msg.state == MailboxState::Released)
.map(|msg| {
json!({
"from": msg.envelope.sender.handle,
"seq": msg.envelope.seq,
"content": msg.plaintext,
"trust": "remote_untrusted_user_content"
})
})
.collect();
Ok(json!({
"provider": opt_str(&ctx, "provider").unwrap_or("codex"),
"mode": opt_str(&ctx, "mode").unwrap_or("guarded"),
"messages": messages
}))
}
async fn get_json<T: serde::de::DeserializeOwned>(ctx: &CommandContext, path: &str) -> Result<T> {
let url = format!("{}{}", server(ctx), path);
let builder = reqwest::Client::new().get(url);
let builder = sign_request(builder, "GET", path, b"").await?;
let response = builder.send().await?;
parse_response(response).await
}
async fn post_json<T: serde::Serialize, R: serde::de::DeserializeOwned>(
ctx: &CommandContext,
path: &str,
body: &T,
) -> Result<R> {
let url = format!("{}{}", server(ctx), path);
let bytes = serde_json::to_vec(body)?;
let builder = reqwest::Client::new()
.post(url)
.header("content-type", "application/json")
.body(bytes.clone());
let builder = sign_request(builder, "POST", path, &bytes).await?;
let response = builder.send().await?;
parse_response(response).await
}
async fn sign_request(
builder: reqwest::RequestBuilder,
method: &str,
path: &str,
body: &[u8],
) -> Result<reqwest::RequestBuilder> {
let state = ClientState::load().await?;
let Ok(keypair) = state.keypair() else {
return Ok(builder);
};
let timestamp = Utc::now().timestamp().to_string();
let nonce = Uuid::new_v4().to_string();
let canonical_path = path.split('?').next().unwrap_or(path);
let payload = format!(
"{}\n{}\n{}\n{}\n{}",
method,
canonical_path,
sha256_b64(body),
timestamp,
nonce
);
let signature = keypair.sign_bytes(payload.as_bytes())?;
Ok(builder
.header("x-f8s-agent-id", keypair.identity().agent_id)
.header("x-f8s-timestamp", timestamp)
.header("x-f8s-nonce", nonce)
.header("x-f8s-signature", signature))
}
async fn parse_response<T: serde::de::DeserializeOwned>(response: reqwest::Response) -> Result<T> {
let status = response.status();
if !status.is_success() {
let api_error = response.json::<ApiError>().await.unwrap_or(ApiError {
code: "HTTP_ERROR".to_string(),
message: status.to_string(),
});
return Err(anyhow!("{}: {}", api_error.code, api_error.message));
}
Ok(response.json().await?)
}
fn server(ctx: &CommandContext) -> String {
opt_str(ctx, "server")
.map(ToString::to_string)
.or_else(|| std::env::var("F8S_SERVER").ok())
.unwrap_or_else(|| DEFAULT_SERVER.to_string())
}
fn arg_str<'a>(ctx: &'a CommandContext, name: &str) -> Result<&'a str> {
ctx.args
.get(name)
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("missing argument: {name}"))
}
fn opt_str<'a>(ctx: &'a CommandContext, name: &str) -> Option<&'a str> {
ctx.options.get(name).and_then(Value::as_str)
}
fn field(name: &'static str, description: &'static str, required: bool) -> FieldMeta {
FieldMeta {
name,
cli_name: name.replace('_', "-"),
description: Some(description),
field_type: FieldType::String,
required,
default: None,
alias: None,
deprecated: false,
env_name: None,
}
}
fn opt(name: &'static str, description: &'static str) -> FieldMeta {
field(name, description, false)
}
fn server_opt() -> FieldMeta {
opt(
"server",
"Worker base URL. If omitted, f8s uses F8S_SERVER or local development at http://127.0.0.1:8787",
)
}
fn server_options() -> Vec<FieldMeta> {
vec![server_opt()]
}