Skip to main content

f8s_worker/
lib.rs

1use chrono::Utc;
2use f8s_core::{
3    ApiError, ApproveJoinRequest, CreateThreadRequest, CreateThreadResponse, FetchMessagesResponse,
4    Invite, JoinStatus, JoinThreadRequest, JoinThreadResponse, MemberRole, Peer, PendingJoin,
5    SendMessageRequest, SendMessageResponse, ThreadId, sha256_b64, verify_bytes_signature,
6    verify_envelope, verify_json_signature,
7};
8use uuid::Uuid;
9use wasm_bindgen::JsValue;
10use worker::*;
11
12const THREADS: &str = "THREADS";
13
14#[event(fetch)]
15pub async fn fetch(req: Request, env: Env, _ctx: Context) -> Result<Response> {
16    Router::new()
17        .post_async("/v1/threads", |mut req, ctx| async move {
18            let body: CreateThreadRequest = req.json().await?;
19            proxy_json(ctx, &body.thread_id.0, req.method(), req.path(), &body).await
20        })
21        .post_async("/v1/threads/:thread/join", |req, ctx| async move {
22            proxy_existing(ctx, req).await
23        })
24        .get_async("/v1/threads/:thread/joins", |req, ctx| async move {
25            proxy_existing(ctx, req).await
26        })
27        .post_async(
28            "/v1/threads/:thread/joins/:join/approve",
29            |req, ctx| async move { proxy_existing(ctx, req).await },
30        )
31        .get_async("/v1/threads/:thread/peers", |req, ctx| async move {
32            proxy_existing(ctx, req).await
33        })
34        .post_async("/v1/threads/:thread/messages", |req, ctx| async move {
35            proxy_existing(ctx, req).await
36        })
37        .get_async("/v1/threads/:thread/messages", |req, ctx| async move {
38            proxy_existing(ctx, req).await
39        })
40        .post_async(
41            "/v1/threads/:thread/artifacts/presign",
42            |_req, _ctx| async move {
43                error(
44                    "ARTIFACTS_NOT_CONFIGURED",
45                    "R2 presigning is not configured in this alpha worker",
46                    501,
47                )
48            },
49        )
50        .run(req, env)
51        .await
52}
53
54async fn proxy_existing(ctx: RouteContext<()>, req: Request) -> Result<Response> {
55    let thread = ctx
56        .param("thread")
57        .ok_or_else(|| Error::RustError("missing thread parameter".to_string()))?
58        .to_string();
59    let ns = ctx.durable_object(THREADS)?;
60    let stub = ns.get_by_name(&thread)?;
61    stub.fetch_with_request(req).await
62}
63
64async fn proxy_json<T: serde::Serialize>(
65    ctx: RouteContext<()>,
66    thread: &str,
67    method: worker::Method,
68    path: String,
69    body: &T,
70) -> Result<Response> {
71    let ns = ctx.durable_object(THREADS)?;
72    let stub = ns.get_by_name(thread)?;
73    let mut init = RequestInit::new();
74    init.with_method(method);
75    let headers = Headers::new();
76    headers.set("content-type", "application/json")?;
77    init.with_headers(headers);
78    init.with_body(Some(JsValue::from_str(&serde_json::to_string(body)?)));
79    let req = Request::new_with_init(&format!("https://f8s.internal{path}"), &init)?;
80    stub.fetch_with_request(req).await
81}
82
83#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
84struct ThreadState {
85    thread_id: ThreadId,
86    invite_verifier: String,
87    created_at: chrono::DateTime<Utc>,
88    members: Vec<Peer>,
89    joins: Vec<PendingJoin>,
90    messages: Vec<f8s_core::Envelope>,
91    next_seq: u64,
92}
93
94#[durable_object]
95pub struct ThreadObject {
96    state: State,
97    _env: Env,
98}
99
100impl DurableObject for ThreadObject {
101    fn new(state: State, env: Env) -> Self {
102        Self { state, _env: env }
103    }
104
105    async fn fetch(&self, mut req: Request) -> Result<Response> {
106        let path = req.path();
107        let method = req.method();
108
109        if method == worker::Method::Post && path == "/v1/threads" {
110            let body: CreateThreadRequest = req.json().await?;
111            return self.create_thread(body).await;
112        }
113
114        let thread_id = extract_thread_id(&path)?;
115
116        if method == worker::Method::Post && path.ends_with("/join") {
117            let body: JoinThreadRequest = req.json().await?;
118            return self.join_thread(thread_id, body).await;
119        }
120        if method == worker::Method::Get && path.ends_with("/joins") {
121            return self.list_joins(&req).await;
122        }
123        if method == worker::Method::Post && path.contains("/joins/") && path.ends_with("/approve")
124        {
125            let join_id = path
126                .split("/joins/")
127                .nth(1)
128                .and_then(|rest| rest.split('/').next())
129                .and_then(|id| Uuid::parse_str(id).ok())
130                .ok_or_else(|| Error::RustError("bad join id".to_string()))?;
131            let body: ApproveJoinRequest = req.json().await?;
132            return self.approve_join(join_id, body).await;
133        }
134        if method == worker::Method::Get && path.ends_with("/peers") {
135            return self.peers(&req).await;
136        }
137        if method == worker::Method::Post && path.ends_with("/messages") {
138            let body: SendMessageRequest = req.json().await?;
139            return self.send_message(body).await;
140        }
141        if method == worker::Method::Get && path.ends_with("/messages") {
142            let after = req
143                .url()?
144                .query_pairs()
145                .find(|(key, _)| key == "after")
146                .and_then(|(_, value)| value.parse::<u64>().ok())
147                .unwrap_or(0);
148            return self.fetch_messages(&req, thread_id, after).await;
149        }
150
151        error("NOT_FOUND", "route not found", 404)
152    }
153}
154
155impl ThreadObject {
156    async fn create_thread(&self, body: CreateThreadRequest) -> Result<Response> {
157        if verify_json_signature(
158            &body.creator.signing_public_key,
159            &body.thread_id.0,
160            &body.creator_signature,
161        )
162        .is_err()
163        {
164            return error("BAD_SIGNATURE", "creator signature is invalid", 403);
165        }
166        if self.load().await?.is_some() {
167            return error("THREAD_EXISTS", "thread already exists", 409);
168        }
169        let now = Utc::now();
170        let peer = Peer {
171            member: body.creator.clone(),
172            role: MemberRole::Owner,
173            joined_at: now,
174            epoch: 1,
175        };
176        let state = ThreadState {
177            thread_id: body.thread_id.clone(),
178            invite_verifier: body.invite_verifier,
179            created_at: now,
180            members: vec![peer],
181            joins: vec![],
182            messages: vec![],
183            next_seq: 1,
184        };
185        self.save(&state).await?;
186        Response::from_json(&CreateThreadResponse {
187            thread_id: body.thread_id,
188            first_member: body.creator,
189            created_at: now,
190        })
191    }
192
193    async fn join_thread(&self, thread_id: ThreadId, body: JoinThreadRequest) -> Result<Response> {
194        let mut state = self.require_state().await?;
195        if verify_json_signature(
196            &body.agent.signing_public_key,
197            &body.agent,
198            &body.agent_signature,
199        )
200        .is_err()
201        {
202            return error("BAD_SIGNATURE", "join signature is invalid", 403);
203        }
204        if body.thread_id != thread_id {
205            return error(
206                "THREAD_MISMATCH",
207                "request thread does not match route",
208                400,
209            );
210        }
211        let invite = Invite {
212            thread_id,
213            secret: body.invite_secret,
214        };
215        if invite.verifier() != state.invite_verifier {
216            return error("INVALID_INVITE", "invite verifier did not match", 403);
217        }
218        if state
219            .members
220            .iter()
221            .any(|peer| peer.member.agent_id == body.agent.agent_id)
222        {
223            return Response::from_json(&JoinThreadResponse {
224                join_id: Uuid::new_v4(),
225                status: JoinStatus::Approved,
226            });
227        }
228        let join = PendingJoin {
229            join_id: Uuid::new_v4(),
230            agent: body.agent,
231            requested_at: Utc::now(),
232            status: JoinStatus::Pending,
233        };
234        let response = JoinThreadResponse {
235            join_id: join.join_id,
236            status: JoinStatus::Pending,
237        };
238        state.joins.push(join);
239        self.save(&state).await?;
240        Response::from_json(&response)
241    }
242
243    async fn list_joins(&self, req: &Request) -> Result<Response> {
244        let state = self.require_state().await?;
245        if !verify_member_request(&state, req, &sha256_b64(b"")) {
246            return error("BAD_SIGNATURE", "member request signature is invalid", 403);
247        }
248        Response::from_json(&state.joins)
249    }
250
251    async fn approve_join(&self, join_id: Uuid, body: ApproveJoinRequest) -> Result<Response> {
252        let mut state = self.require_state().await?;
253        if verify_envelope(&body.welcome).is_err() {
254            return error(
255                "BAD_SIGNATURE",
256                "welcome envelope signature is invalid",
257                403,
258            );
259        }
260        if !state
261            .members
262            .iter()
263            .any(|peer| peer.member.agent_id == body.approver.agent_id)
264        {
265            return error("NOT_MEMBER", "approver is not a member", 403);
266        }
267        let join = state
268            .joins
269            .iter_mut()
270            .find(|join| join.join_id == join_id)
271            .ok_or_else(|| Error::RustError("join not found".to_string()))?;
272        join.status = JoinStatus::Approved;
273        state.members.push(Peer {
274            member: join.agent.clone(),
275            role: MemberRole::Member,
276            joined_at: Utc::now(),
277            epoch: 1,
278        });
279        let mut welcome = body.welcome;
280        welcome.seq = Some(state.next_seq);
281        state.next_seq += 1;
282        state.messages.push(welcome.clone());
283        self.save(&state).await?;
284        Response::from_json(&serde_json::json!({
285            "join_id": join_id,
286            "status": "approved",
287            "welcome_seq": welcome.seq
288        }))
289    }
290
291    async fn peers(&self, req: &Request) -> Result<Response> {
292        let state = self.require_state().await?;
293        if !verify_member_request(&state, req, &sha256_b64(b"")) {
294            return error("BAD_SIGNATURE", "member request signature is invalid", 403);
295        }
296        Response::from_json(&state.members)
297    }
298
299    async fn send_message(&self, body: SendMessageRequest) -> Result<Response> {
300        let mut state = self.require_state().await?;
301        if verify_envelope(&body.envelope).is_err() {
302            return error("BAD_SIGNATURE", "envelope signature is invalid", 403);
303        }
304        if !state
305            .members
306            .iter()
307            .any(|peer| peer.member.agent_id == body.envelope.sender.agent_id)
308        {
309            return error("NOT_MEMBER", "sender is not a member", 403);
310        }
311        let mut envelope = body.envelope;
312        envelope.seq = Some(state.next_seq);
313        state.next_seq += 1;
314        let response = SendMessageResponse {
315            seq: envelope.seq.unwrap_or_default(),
316            envelope_id: envelope.envelope_id,
317        };
318        state.messages.push(envelope);
319        self.save(&state).await?;
320        Response::from_json(&response)
321    }
322
323    async fn fetch_messages(
324        &self,
325        req: &Request,
326        thread_id: ThreadId,
327        after: u64,
328    ) -> Result<Response> {
329        let state = self.require_state().await?;
330        if !verify_member_request(&state, req, &sha256_b64(b"")) {
331            return error("BAD_SIGNATURE", "member request signature is invalid", 403);
332        }
333        let messages = state
334            .messages
335            .into_iter()
336            .filter(|message| message.seq.unwrap_or_default() > after)
337            .collect();
338        Response::from_json(&FetchMessagesResponse {
339            thread_id,
340            messages,
341        })
342    }
343
344    async fn load(&self) -> Result<Option<ThreadState>> {
345        self.state.storage().get("thread").await
346    }
347
348    async fn require_state(&self) -> Result<ThreadState> {
349        self.load()
350            .await?
351            .ok_or_else(|| Error::RustError("thread does not exist".to_string()))
352    }
353
354    async fn save(&self, state: &ThreadState) -> Result<()> {
355        self.state.storage().put("thread", state).await
356    }
357}
358
359fn extract_thread_id(path: &str) -> Result<ThreadId> {
360    path.split('/')
361        .nth(3)
362        .map(|id| ThreadId(id.to_string()))
363        .ok_or_else(|| Error::RustError("missing thread id".to_string()))
364}
365
366fn verify_member_request(state: &ThreadState, req: &Request, body_hash: &str) -> bool {
367    let agent_id = match req.headers().get("x-f8s-agent-id").ok().flatten() {
368        Some(value) => value,
369        None => return false,
370    };
371    let timestamp = match req.headers().get("x-f8s-timestamp").ok().flatten() {
372        Some(value) => value,
373        None => return false,
374    };
375    let nonce = match req.headers().get("x-f8s-nonce").ok().flatten() {
376        Some(value) => value,
377        None => return false,
378    };
379    let signature = match req.headers().get("x-f8s-signature").ok().flatten() {
380        Some(value) => value,
381        None => return false,
382    };
383    let Some(peer) = state
384        .members
385        .iter()
386        .find(|peer| peer.member.agent_id == agent_id)
387    else {
388        return false;
389    };
390    let payload = format!(
391        "{}\n{}\n{}\n{}\n{}",
392        req.method().as_ref(),
393        req.path(),
394        body_hash,
395        timestamp,
396        nonce
397    );
398    verify_bytes_signature(
399        &peer.member.signing_public_key,
400        payload.as_bytes(),
401        &signature,
402    )
403    .is_ok()
404}
405
406fn error(code: &str, message: &str, status: u16) -> Result<Response> {
407    Ok(Response::from_json(&ApiError {
408        code: code.to_string(),
409        message: message.to_string(),
410    })?
411    .with_status(status))
412}