use chrono::Utc;
use f8s_core::{
ApiError, ApproveJoinRequest, CreateThreadRequest, CreateThreadResponse, FetchMessagesResponse,
Invite, JoinStatus, JoinThreadRequest, JoinThreadResponse, MemberRole, Peer, PendingJoin,
SendMessageRequest, SendMessageResponse, ThreadId, sha256_b64, verify_bytes_signature,
verify_envelope, verify_json_signature,
};
use uuid::Uuid;
use wasm_bindgen::JsValue;
use worker::*;
const THREADS: &str = "THREADS";
#[event(fetch)]
pub async fn fetch(req: Request, env: Env, _ctx: Context) -> Result<Response> {
Router::new()
.post_async("/v1/threads", |mut req, ctx| async move {
let body: CreateThreadRequest = req.json().await?;
proxy_json(ctx, &body.thread_id.0, req.method(), req.path(), &body).await
})
.post_async("/v1/threads/:thread/join", |req, ctx| async move {
proxy_existing(ctx, req).await
})
.get_async("/v1/threads/:thread/joins", |req, ctx| async move {
proxy_existing(ctx, req).await
})
.post_async(
"/v1/threads/:thread/joins/:join/approve",
|req, ctx| async move { proxy_existing(ctx, req).await },
)
.get_async("/v1/threads/:thread/peers", |req, ctx| async move {
proxy_existing(ctx, req).await
})
.post_async("/v1/threads/:thread/messages", |req, ctx| async move {
proxy_existing(ctx, req).await
})
.get_async("/v1/threads/:thread/messages", |req, ctx| async move {
proxy_existing(ctx, req).await
})
.post_async(
"/v1/threads/:thread/artifacts/presign",
|_req, _ctx| async move {
error(
"ARTIFACTS_NOT_CONFIGURED",
"R2 presigning is not configured in this alpha worker",
501,
)
},
)
.run(req, env)
.await
}
async fn proxy_existing(ctx: RouteContext<()>, req: Request) -> Result<Response> {
let thread = ctx
.param("thread")
.ok_or_else(|| Error::RustError("missing thread parameter".to_string()))?
.to_string();
let ns = ctx.durable_object(THREADS)?;
let stub = ns.get_by_name(&thread)?;
stub.fetch_with_request(req).await
}
async fn proxy_json<T: serde::Serialize>(
ctx: RouteContext<()>,
thread: &str,
method: worker::Method,
path: String,
body: &T,
) -> Result<Response> {
let ns = ctx.durable_object(THREADS)?;
let stub = ns.get_by_name(thread)?;
let mut init = RequestInit::new();
init.with_method(method);
let headers = Headers::new();
headers.set("content-type", "application/json")?;
init.with_headers(headers);
init.with_body(Some(JsValue::from_str(&serde_json::to_string(body)?)));
let req = Request::new_with_init(&format!("https://f8s.internal{path}"), &init)?;
stub.fetch_with_request(req).await
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct ThreadState {
thread_id: ThreadId,
invite_verifier: String,
created_at: chrono::DateTime<Utc>,
members: Vec<Peer>,
joins: Vec<PendingJoin>,
messages: Vec<f8s_core::Envelope>,
next_seq: u64,
}
#[durable_object]
pub struct ThreadObject {
state: State,
_env: Env,
}
impl DurableObject for ThreadObject {
fn new(state: State, env: Env) -> Self {
Self { state, _env: env }
}
async fn fetch(&self, mut req: Request) -> Result<Response> {
let path = req.path();
let method = req.method();
if method == worker::Method::Post && path == "/v1/threads" {
let body: CreateThreadRequest = req.json().await?;
return self.create_thread(body).await;
}
let thread_id = extract_thread_id(&path)?;
if method == worker::Method::Post && path.ends_with("/join") {
let body: JoinThreadRequest = req.json().await?;
return self.join_thread(thread_id, body).await;
}
if method == worker::Method::Get && path.ends_with("/joins") {
return self.list_joins(&req).await;
}
if method == worker::Method::Post && path.contains("/joins/") && path.ends_with("/approve")
{
let join_id = path
.split("/joins/")
.nth(1)
.and_then(|rest| rest.split('/').next())
.and_then(|id| Uuid::parse_str(id).ok())
.ok_or_else(|| Error::RustError("bad join id".to_string()))?;
let body: ApproveJoinRequest = req.json().await?;
return self.approve_join(join_id, body).await;
}
if method == worker::Method::Get && path.ends_with("/peers") {
return self.peers(&req).await;
}
if method == worker::Method::Post && path.ends_with("/messages") {
let body: SendMessageRequest = req.json().await?;
return self.send_message(body).await;
}
if method == worker::Method::Get && path.ends_with("/messages") {
let after = req
.url()?
.query_pairs()
.find(|(key, _)| key == "after")
.and_then(|(_, value)| value.parse::<u64>().ok())
.unwrap_or(0);
return self.fetch_messages(&req, thread_id, after).await;
}
error("NOT_FOUND", "route not found", 404)
}
}
impl ThreadObject {
async fn create_thread(&self, body: CreateThreadRequest) -> Result<Response> {
if verify_json_signature(
&body.creator.signing_public_key,
&body.thread_id.0,
&body.creator_signature,
)
.is_err()
{
return error("BAD_SIGNATURE", "creator signature is invalid", 403);
}
if self.load().await?.is_some() {
return error("THREAD_EXISTS", "thread already exists", 409);
}
let now = Utc::now();
let peer = Peer {
member: body.creator.clone(),
role: MemberRole::Owner,
joined_at: now,
epoch: 1,
};
let state = ThreadState {
thread_id: body.thread_id.clone(),
invite_verifier: body.invite_verifier,
created_at: now,
members: vec![peer],
joins: vec![],
messages: vec![],
next_seq: 1,
};
self.save(&state).await?;
Response::from_json(&CreateThreadResponse {
thread_id: body.thread_id,
first_member: body.creator,
created_at: now,
})
}
async fn join_thread(&self, thread_id: ThreadId, body: JoinThreadRequest) -> Result<Response> {
let mut state = self.require_state().await?;
if verify_json_signature(
&body.agent.signing_public_key,
&body.agent,
&body.agent_signature,
)
.is_err()
{
return error("BAD_SIGNATURE", "join signature is invalid", 403);
}
if body.thread_id != thread_id {
return error(
"THREAD_MISMATCH",
"request thread does not match route",
400,
);
}
let invite = Invite {
thread_id,
secret: body.invite_secret,
};
if invite.verifier() != state.invite_verifier {
return error("INVALID_INVITE", "invite verifier did not match", 403);
}
if state
.members
.iter()
.any(|peer| peer.member.agent_id == body.agent.agent_id)
{
return Response::from_json(&JoinThreadResponse {
join_id: Uuid::new_v4(),
status: JoinStatus::Approved,
});
}
let join = PendingJoin {
join_id: Uuid::new_v4(),
agent: body.agent,
requested_at: Utc::now(),
status: JoinStatus::Pending,
};
let response = JoinThreadResponse {
join_id: join.join_id,
status: JoinStatus::Pending,
};
state.joins.push(join);
self.save(&state).await?;
Response::from_json(&response)
}
async fn list_joins(&self, req: &Request) -> Result<Response> {
let state = self.require_state().await?;
if !verify_member_request(&state, req, &sha256_b64(b"")) {
return error("BAD_SIGNATURE", "member request signature is invalid", 403);
}
Response::from_json(&state.joins)
}
async fn approve_join(&self, join_id: Uuid, body: ApproveJoinRequest) -> Result<Response> {
let mut state = self.require_state().await?;
if verify_envelope(&body.welcome).is_err() {
return error(
"BAD_SIGNATURE",
"welcome envelope signature is invalid",
403,
);
}
if !state
.members
.iter()
.any(|peer| peer.member.agent_id == body.approver.agent_id)
{
return error("NOT_MEMBER", "approver is not a member", 403);
}
let join = state
.joins
.iter_mut()
.find(|join| join.join_id == join_id)
.ok_or_else(|| Error::RustError("join not found".to_string()))?;
join.status = JoinStatus::Approved;
state.members.push(Peer {
member: join.agent.clone(),
role: MemberRole::Member,
joined_at: Utc::now(),
epoch: 1,
});
let mut welcome = body.welcome;
welcome.seq = Some(state.next_seq);
state.next_seq += 1;
state.messages.push(welcome.clone());
self.save(&state).await?;
Response::from_json(&serde_json::json!({
"join_id": join_id,
"status": "approved",
"welcome_seq": welcome.seq
}))
}
async fn peers(&self, req: &Request) -> Result<Response> {
let state = self.require_state().await?;
if !verify_member_request(&state, req, &sha256_b64(b"")) {
return error("BAD_SIGNATURE", "member request signature is invalid", 403);
}
Response::from_json(&state.members)
}
async fn send_message(&self, body: SendMessageRequest) -> Result<Response> {
let mut state = self.require_state().await?;
if verify_envelope(&body.envelope).is_err() {
return error("BAD_SIGNATURE", "envelope signature is invalid", 403);
}
if !state
.members
.iter()
.any(|peer| peer.member.agent_id == body.envelope.sender.agent_id)
{
return error("NOT_MEMBER", "sender is not a member", 403);
}
let mut envelope = body.envelope;
envelope.seq = Some(state.next_seq);
state.next_seq += 1;
let response = SendMessageResponse {
seq: envelope.seq.unwrap_or_default(),
envelope_id: envelope.envelope_id,
};
state.messages.push(envelope);
self.save(&state).await?;
Response::from_json(&response)
}
async fn fetch_messages(
&self,
req: &Request,
thread_id: ThreadId,
after: u64,
) -> Result<Response> {
let state = self.require_state().await?;
if !verify_member_request(&state, req, &sha256_b64(b"")) {
return error("BAD_SIGNATURE", "member request signature is invalid", 403);
}
let messages = state
.messages
.into_iter()
.filter(|message| message.seq.unwrap_or_default() > after)
.collect();
Response::from_json(&FetchMessagesResponse {
thread_id,
messages,
})
}
async fn load(&self) -> Result<Option<ThreadState>> {
self.state.storage().get("thread").await
}
async fn require_state(&self) -> Result<ThreadState> {
self.load()
.await?
.ok_or_else(|| Error::RustError("thread does not exist".to_string()))
}
async fn save(&self, state: &ThreadState) -> Result<()> {
self.state.storage().put("thread", state).await
}
}
fn extract_thread_id(path: &str) -> Result<ThreadId> {
path.split('/')
.nth(3)
.map(|id| ThreadId(id.to_string()))
.ok_or_else(|| Error::RustError("missing thread id".to_string()))
}
fn verify_member_request(state: &ThreadState, req: &Request, body_hash: &str) -> bool {
let agent_id = match req.headers().get("x-f8s-agent-id").ok().flatten() {
Some(value) => value,
None => return false,
};
let timestamp = match req.headers().get("x-f8s-timestamp").ok().flatten() {
Some(value) => value,
None => return false,
};
let nonce = match req.headers().get("x-f8s-nonce").ok().flatten() {
Some(value) => value,
None => return false,
};
let signature = match req.headers().get("x-f8s-signature").ok().flatten() {
Some(value) => value,
None => return false,
};
let Some(peer) = state
.members
.iter()
.find(|peer| peer.member.agent_id == agent_id)
else {
return false;
};
let payload = format!(
"{}\n{}\n{}\n{}\n{}",
req.method().as_ref(),
req.path(),
body_hash,
timestamp,
nonce
);
verify_bytes_signature(
&peer.member.signing_public_key,
payload.as_bytes(),
&signature,
)
.is_ok()
}
fn error(code: &str, message: &str, status: u16) -> Result<Response> {
Ok(Response::from_json(&ApiError {
code: code.to_string(),
message: message.to_string(),
})?
.with_status(status))
}