1use chrono::Utc;
2use f8s_core::{
3 ApiError, ApproveJoinRequest, CreateThreadRequest, CreateThreadResponse, FetchMessagesResponse,
4 Invite, JoinStatus, JoinThreadRequest, JoinThreadResponse, MemberRole, Peer, PendingJoin,
5 SendMessageRequest, SendMessageResponse, ThreadId, sha256_b64, verify_bytes_signature,
6 verify_envelope, verify_json_signature,
7};
8use uuid::Uuid;
9use wasm_bindgen::JsValue;
10use worker::*;
11
12const THREADS: &str = "THREADS";
13
14#[event(fetch)]
15pub async fn fetch(req: Request, env: Env, _ctx: Context) -> Result<Response> {
16 Router::new()
17 .post_async("/v1/threads", |mut req, ctx| async move {
18 let body: CreateThreadRequest = req.json().await?;
19 proxy_json(ctx, &body.thread_id.0, req.method(), req.path(), &body).await
20 })
21 .post_async("/v1/threads/:thread/join", |req, ctx| async move {
22 proxy_existing(ctx, req).await
23 })
24 .get_async("/v1/threads/:thread/joins", |req, ctx| async move {
25 proxy_existing(ctx, req).await
26 })
27 .post_async(
28 "/v1/threads/:thread/joins/:join/approve",
29 |req, ctx| async move { proxy_existing(ctx, req).await },
30 )
31 .get_async("/v1/threads/:thread/peers", |req, ctx| async move {
32 proxy_existing(ctx, req).await
33 })
34 .post_async("/v1/threads/:thread/messages", |req, ctx| async move {
35 proxy_existing(ctx, req).await
36 })
37 .get_async("/v1/threads/:thread/messages", |req, ctx| async move {
38 proxy_existing(ctx, req).await
39 })
40 .post_async(
41 "/v1/threads/:thread/artifacts/presign",
42 |_req, _ctx| async move {
43 error(
44 "ARTIFACTS_NOT_CONFIGURED",
45 "R2 presigning is not configured in this alpha worker",
46 501,
47 )
48 },
49 )
50 .run(req, env)
51 .await
52}
53
54async fn proxy_existing(ctx: RouteContext<()>, req: Request) -> Result<Response> {
55 let thread = ctx
56 .param("thread")
57 .ok_or_else(|| Error::RustError("missing thread parameter".to_string()))?
58 .to_string();
59 let ns = ctx.durable_object(THREADS)?;
60 let stub = ns.get_by_name(&thread)?;
61 stub.fetch_with_request(req).await
62}
63
64async fn proxy_json<T: serde::Serialize>(
65 ctx: RouteContext<()>,
66 thread: &str,
67 method: worker::Method,
68 path: String,
69 body: &T,
70) -> Result<Response> {
71 let ns = ctx.durable_object(THREADS)?;
72 let stub = ns.get_by_name(thread)?;
73 let mut init = RequestInit::new();
74 init.with_method(method);
75 let headers = Headers::new();
76 headers.set("content-type", "application/json")?;
77 init.with_headers(headers);
78 init.with_body(Some(JsValue::from_str(&serde_json::to_string(body)?)));
79 let req = Request::new_with_init(&format!("https://f8s.internal{path}"), &init)?;
80 stub.fetch_with_request(req).await
81}
82
83#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
84struct ThreadState {
85 thread_id: ThreadId,
86 invite_verifier: String,
87 created_at: chrono::DateTime<Utc>,
88 members: Vec<Peer>,
89 joins: Vec<PendingJoin>,
90 messages: Vec<f8s_core::Envelope>,
91 next_seq: u64,
92}
93
94#[durable_object]
95pub struct ThreadObject {
96 state: State,
97 _env: Env,
98}
99
100impl DurableObject for ThreadObject {
101 fn new(state: State, env: Env) -> Self {
102 Self { state, _env: env }
103 }
104
105 async fn fetch(&self, mut req: Request) -> Result<Response> {
106 let path = req.path();
107 let method = req.method();
108
109 if method == worker::Method::Post && path == "/v1/threads" {
110 let body: CreateThreadRequest = req.json().await?;
111 return self.create_thread(body).await;
112 }
113
114 let thread_id = extract_thread_id(&path)?;
115
116 if method == worker::Method::Post && path.ends_with("/join") {
117 let body: JoinThreadRequest = req.json().await?;
118 return self.join_thread(thread_id, body).await;
119 }
120 if method == worker::Method::Get && path.ends_with("/joins") {
121 return self.list_joins(&req).await;
122 }
123 if method == worker::Method::Post && path.contains("/joins/") && path.ends_with("/approve")
124 {
125 let join_id = path
126 .split("/joins/")
127 .nth(1)
128 .and_then(|rest| rest.split('/').next())
129 .and_then(|id| Uuid::parse_str(id).ok())
130 .ok_or_else(|| Error::RustError("bad join id".to_string()))?;
131 let body: ApproveJoinRequest = req.json().await?;
132 return self.approve_join(join_id, body).await;
133 }
134 if method == worker::Method::Get && path.ends_with("/peers") {
135 return self.peers(&req).await;
136 }
137 if method == worker::Method::Post && path.ends_with("/messages") {
138 let body: SendMessageRequest = req.json().await?;
139 return self.send_message(body).await;
140 }
141 if method == worker::Method::Get && path.ends_with("/messages") {
142 let after = req
143 .url()?
144 .query_pairs()
145 .find(|(key, _)| key == "after")
146 .and_then(|(_, value)| value.parse::<u64>().ok())
147 .unwrap_or(0);
148 return self.fetch_messages(&req, thread_id, after).await;
149 }
150
151 error("NOT_FOUND", "route not found", 404)
152 }
153}
154
155impl ThreadObject {
156 async fn create_thread(&self, body: CreateThreadRequest) -> Result<Response> {
157 if verify_json_signature(
158 &body.creator.signing_public_key,
159 &body.thread_id.0,
160 &body.creator_signature,
161 )
162 .is_err()
163 {
164 return error("BAD_SIGNATURE", "creator signature is invalid", 403);
165 }
166 if self.load().await?.is_some() {
167 return error("THREAD_EXISTS", "thread already exists", 409);
168 }
169 let now = Utc::now();
170 let peer = Peer {
171 member: body.creator.clone(),
172 role: MemberRole::Owner,
173 joined_at: now,
174 epoch: 1,
175 };
176 let state = ThreadState {
177 thread_id: body.thread_id.clone(),
178 invite_verifier: body.invite_verifier,
179 created_at: now,
180 members: vec![peer],
181 joins: vec![],
182 messages: vec![],
183 next_seq: 1,
184 };
185 self.save(&state).await?;
186 Response::from_json(&CreateThreadResponse {
187 thread_id: body.thread_id,
188 first_member: body.creator,
189 created_at: now,
190 })
191 }
192
193 async fn join_thread(&self, thread_id: ThreadId, body: JoinThreadRequest) -> Result<Response> {
194 let mut state = self.require_state().await?;
195 if verify_json_signature(
196 &body.agent.signing_public_key,
197 &body.agent,
198 &body.agent_signature,
199 )
200 .is_err()
201 {
202 return error("BAD_SIGNATURE", "join signature is invalid", 403);
203 }
204 if body.thread_id != thread_id {
205 return error(
206 "THREAD_MISMATCH",
207 "request thread does not match route",
208 400,
209 );
210 }
211 let invite = Invite {
212 thread_id,
213 secret: body.invite_secret,
214 };
215 if invite.verifier() != state.invite_verifier {
216 return error("INVALID_INVITE", "invite verifier did not match", 403);
217 }
218 if state
219 .members
220 .iter()
221 .any(|peer| peer.member.agent_id == body.agent.agent_id)
222 {
223 return Response::from_json(&JoinThreadResponse {
224 join_id: Uuid::new_v4(),
225 status: JoinStatus::Approved,
226 });
227 }
228 let join = PendingJoin {
229 join_id: Uuid::new_v4(),
230 agent: body.agent,
231 requested_at: Utc::now(),
232 status: JoinStatus::Pending,
233 };
234 let response = JoinThreadResponse {
235 join_id: join.join_id,
236 status: JoinStatus::Pending,
237 };
238 state.joins.push(join);
239 self.save(&state).await?;
240 Response::from_json(&response)
241 }
242
243 async fn list_joins(&self, req: &Request) -> Result<Response> {
244 let state = self.require_state().await?;
245 if !verify_member_request(&state, req, &sha256_b64(b"")) {
246 return error("BAD_SIGNATURE", "member request signature is invalid", 403);
247 }
248 Response::from_json(&state.joins)
249 }
250
251 async fn approve_join(&self, join_id: Uuid, body: ApproveJoinRequest) -> Result<Response> {
252 let mut state = self.require_state().await?;
253 if verify_envelope(&body.welcome).is_err() {
254 return error(
255 "BAD_SIGNATURE",
256 "welcome envelope signature is invalid",
257 403,
258 );
259 }
260 if !state
261 .members
262 .iter()
263 .any(|peer| peer.member.agent_id == body.approver.agent_id)
264 {
265 return error("NOT_MEMBER", "approver is not a member", 403);
266 }
267 let join = state
268 .joins
269 .iter_mut()
270 .find(|join| join.join_id == join_id)
271 .ok_or_else(|| Error::RustError("join not found".to_string()))?;
272 join.status = JoinStatus::Approved;
273 state.members.push(Peer {
274 member: join.agent.clone(),
275 role: MemberRole::Member,
276 joined_at: Utc::now(),
277 epoch: 1,
278 });
279 let mut welcome = body.welcome;
280 welcome.seq = Some(state.next_seq);
281 state.next_seq += 1;
282 state.messages.push(welcome.clone());
283 self.save(&state).await?;
284 Response::from_json(&serde_json::json!({
285 "join_id": join_id,
286 "status": "approved",
287 "welcome_seq": welcome.seq
288 }))
289 }
290
291 async fn peers(&self, req: &Request) -> Result<Response> {
292 let state = self.require_state().await?;
293 if !verify_member_request(&state, req, &sha256_b64(b"")) {
294 return error("BAD_SIGNATURE", "member request signature is invalid", 403);
295 }
296 Response::from_json(&state.members)
297 }
298
299 async fn send_message(&self, body: SendMessageRequest) -> Result<Response> {
300 let mut state = self.require_state().await?;
301 if verify_envelope(&body.envelope).is_err() {
302 return error("BAD_SIGNATURE", "envelope signature is invalid", 403);
303 }
304 if !state
305 .members
306 .iter()
307 .any(|peer| peer.member.agent_id == body.envelope.sender.agent_id)
308 {
309 return error("NOT_MEMBER", "sender is not a member", 403);
310 }
311 let mut envelope = body.envelope;
312 envelope.seq = Some(state.next_seq);
313 state.next_seq += 1;
314 let response = SendMessageResponse {
315 seq: envelope.seq.unwrap_or_default(),
316 envelope_id: envelope.envelope_id,
317 };
318 state.messages.push(envelope);
319 self.save(&state).await?;
320 Response::from_json(&response)
321 }
322
323 async fn fetch_messages(
324 &self,
325 req: &Request,
326 thread_id: ThreadId,
327 after: u64,
328 ) -> Result<Response> {
329 let state = self.require_state().await?;
330 if !verify_member_request(&state, req, &sha256_b64(b"")) {
331 return error("BAD_SIGNATURE", "member request signature is invalid", 403);
332 }
333 let messages = state
334 .messages
335 .into_iter()
336 .filter(|message| message.seq.unwrap_or_default() > after)
337 .collect();
338 Response::from_json(&FetchMessagesResponse {
339 thread_id,
340 messages,
341 })
342 }
343
344 async fn load(&self) -> Result<Option<ThreadState>> {
345 self.state.storage().get("thread").await
346 }
347
348 async fn require_state(&self) -> Result<ThreadState> {
349 self.load()
350 .await?
351 .ok_or_else(|| Error::RustError("thread does not exist".to_string()))
352 }
353
354 async fn save(&self, state: &ThreadState) -> Result<()> {
355 self.state.storage().put("thread", state).await
356 }
357}
358
359fn extract_thread_id(path: &str) -> Result<ThreadId> {
360 path.split('/')
361 .nth(3)
362 .map(|id| ThreadId(id.to_string()))
363 .ok_or_else(|| Error::RustError("missing thread id".to_string()))
364}
365
366fn verify_member_request(state: &ThreadState, req: &Request, body_hash: &str) -> bool {
367 let agent_id = match req.headers().get("x-f8s-agent-id").ok().flatten() {
368 Some(value) => value,
369 None => return false,
370 };
371 let timestamp = match req.headers().get("x-f8s-timestamp").ok().flatten() {
372 Some(value) => value,
373 None => return false,
374 };
375 let nonce = match req.headers().get("x-f8s-nonce").ok().flatten() {
376 Some(value) => value,
377 None => return false,
378 };
379 let signature = match req.headers().get("x-f8s-signature").ok().flatten() {
380 Some(value) => value,
381 None => return false,
382 };
383 let Some(peer) = state
384 .members
385 .iter()
386 .find(|peer| peer.member.agent_id == agent_id)
387 else {
388 return false;
389 };
390 let payload = format!(
391 "{}\n{}\n{}\n{}\n{}",
392 req.method().as_ref(),
393 req.path(),
394 body_hash,
395 timestamp,
396 nonce
397 );
398 verify_bytes_signature(
399 &peer.member.signing_public_key,
400 payload.as_bytes(),
401 &signature,
402 )
403 .is_ok()
404}
405
406fn error(code: &str, message: &str, status: u16) -> Result<Response> {
407 Ok(Response::from_json(&ApiError {
408 code: code.to_string(),
409 message: message.to_string(),
410 })?
411 .with_status(status))
412}