1use adenosine::created_at_now;
2use adenosine::identifiers::{AtUri, Did, Nsid, Ticker, Tid};
3use anyhow::{anyhow, Result};
4use askama::Template;
5use log::{debug, error, info, warn};
6use rouille::{router, Request, Response};
7use serde_json::{json, Value};
8use std::fmt;
9use std::io::Read;
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::sync::Mutex;
13
14mod db;
15mod db_bsky;
16mod web;
17
18use adenosine::app_bsky;
19use adenosine::com_atproto;
20use adenosine::crypto::KeyPair;
21use adenosine::ipld::{ipld_into_json_value, json_value_into_ipld};
22use adenosine::plc;
23use adenosine::plc::DidDocMeta;
24use adenosine::repo::{Mutation, RepoStore};
25use db::AtpDatabase;
26use db_bsky::*;
27use web::*;
28
29#[derive(Debug)]
30pub enum XrpcError {
31 BadRequest(String),
32 NotFound(String),
33 Forbidden(String),
34 MutexPoisoned,
35}
36
37impl std::error::Error for XrpcError {}
38
39impl fmt::Display for XrpcError {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 match self {
42 Self::BadRequest(msg) | Self::NotFound(msg) | Self::Forbidden(msg) => {
43 write!(f, "{msg}")
44 }
45 Self::MutexPoisoned => write!(f, "service mutex poisoned"),
46 }
47 }
48}
49
50pub struct AtpService {
51 pub repo: RepoStore,
52 pub atp_db: AtpDatabase,
53 pub pds_keypair: KeyPair,
54 pub tid_gen: Ticker,
55 pub config: AtpServiceConfig,
56}
57
58#[derive(Clone, Debug)]
59pub struct AtpServiceConfig {
60 pub listen_host_port: String,
61 pub public_url: String,
62 pub registration_domain: Option<String>,
63 pub invite_code: Option<String>,
64 pub homepage_handle: Option<String>,
65}
66
67impl Default for AtpServiceConfig {
68 fn default() -> Self {
69 AtpServiceConfig {
70 listen_host_port: "localhost:3030".to_string(),
71 public_url: "http://localhost".to_string(),
72 registration_domain: None,
73 invite_code: None,
74 homepage_handle: None,
75 }
76 }
77}
78
79fn xrpc_wrap<S: serde::Serialize>(resp: Result<S>) -> Response {
81 match resp {
82 Ok(val) => Response::json(&val),
83 Err(e) => {
84 let msg = e.to_string();
85 let code = match e.downcast_ref::<XrpcError>() {
86 Some(XrpcError::BadRequest(_)) => 400,
87 Some(XrpcError::NotFound(_)) => 404,
88 Some(XrpcError::Forbidden(_)) => 403,
89 Some(XrpcError::MutexPoisoned) => std::process::exit(-1),
91 None => 500,
92 };
93 warn!("HTTP {}: {}", code, msg);
94 Response::json(&json!({ "message": msg })).with_status_code(code)
95 }
96 }
97}
98
99fn web_wrap(resp: Result<String>) -> Response {
102 match resp {
103 Ok(val) => Response::html(val),
104 Err(e) => {
105 let msg = e.to_string();
106 let code = match e.downcast_ref::<XrpcError>() {
107 Some(XrpcError::BadRequest(_)) => 400,
108 Some(XrpcError::NotFound(_)) => 404,
109 Some(XrpcError::Forbidden(_)) => 403,
110 Some(XrpcError::MutexPoisoned) => std::process::exit(-1),
112 None => 500,
113 };
114 warn!("HTTP {}: {}", code, msg);
115 let view = ErrorView {
116 domain: "ERROR".to_string(),
117 status_code: code,
118 error_message: msg,
119 };
120 Response::html(view.render().unwrap())
121 }
122 }
123}
124
125impl AtpService {
126 pub fn new(
127 blockstore_db_path: &PathBuf,
128 atp_db_path: &PathBuf,
129 keypair: KeyPair,
130 config: AtpServiceConfig,
131 ) -> Result<Self> {
132 Ok(AtpService {
133 repo: RepoStore::open(blockstore_db_path)?,
134 atp_db: AtpDatabase::open(atp_db_path)?,
135 pds_keypair: keypair,
136 tid_gen: Ticker::new(),
137 config,
138 })
139 }
140
141 pub fn new_ephemeral() -> Result<Self> {
142 Ok(AtpService {
143 repo: RepoStore::open_ephemeral()?,
144 atp_db: AtpDatabase::open_ephemeral()?,
145 pds_keypair: KeyPair::new_random(),
146 tid_gen: Ticker::new(),
147 config: AtpServiceConfig::default(),
148 })
149 }
150
151 pub fn run_server(self) -> Result<()> {
152 let config = self.config.clone();
153 let srv = Mutex::new(self);
154
155 let log_ok = |req: &Request, resp: &Response, elap: std::time::Duration| {
156 info!(
157 "{} {} ({}, {:?})",
158 req.method(),
159 req.raw_url(),
160 resp.status_code,
161 elap
162 );
163 };
164 let log_err = |req: &Request, elap: std::time::Duration| {
165 error!(
166 "HTTP handler panicked: {} {} ({:?})",
167 req.method(),
168 req.raw_url(),
169 elap
170 );
171 };
172
173 rouille::start_server(config.listen_host_port, move |request| {
174 rouille::log_custom(request, log_ok, log_err, || {
175 router!(request,
176 (GET) ["/"] => {
178 if let Some(ref handle) = config.homepage_handle {
179 web_wrap(account_view_handler(&srv, handle, request))
180 } else {
181 web_wrap(home_view_handler(&srv, request))
182 }
183 },
184 (GET) ["/.well-known/did.json"] => {
185 match did_doc_view_handler(&srv, request) {
186 Ok(resp) => resp,
187 Err(e) => web_wrap(Err(e)),
188 }
189 },
190 (GET) ["/about"] => {
191 let host = request.header("Host").unwrap_or("localhost");
192 let view = AboutView { domain: host.to_string() };
193 Response::html(view.render().unwrap())
194 },
195 (GET) ["/u/{handle}", handle: String] => {
196 web_wrap(account_view_handler(&srv, &handle, request))
197 },
198 (GET) ["/u/{handle}/post/{tid}", handle: String, tid: Tid] => {
199 web_wrap(thread_view_handler(&srv, &handle, &tid, request))
200 },
201 (GET) ["/at/{did}", did: Did] => {
202 web_wrap(repo_view_handler(&srv, &did, request))
203 },
204 (GET) ["/at/{did}/{collection}", did: Did, collection: Nsid] => {
205 web_wrap(collection_view_handler(&srv, &did, &collection, request))
206 },
207 (GET) ["/at/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => {
208 web_wrap(record_view_handler(&srv, &did, &collection, &tid, request))
209 },
210 (GET) ["/static/adenosine.css"] => {
212 Response::from_data("text/css", include_str!("../templates/adenosine.css"))
213 },
214 (GET) ["/static/favicon.png"] => {
215 Response::from_data("image/png", include_bytes!("../templates/favicon.png").to_vec())
216 },
217 (GET) ["/static/logo_128.png"] => {
218 Response::from_data("image/png", include_bytes!("../templates/logo_128.png").to_vec())
219 },
220 (GET) ["/robots.txt"] => {
221 Response::text(include_str!("../templates/robots.txt"))
222 },
223 (POST) ["/xrpc/{endpoint}", endpoint: String] => {
225 xrpc_wrap(xrpc_post_handler(&srv, &endpoint, request))
226 },
227 (GET) ["/xrpc/com.atproto.sync.getRepo"] => {
228 match xrpc_get_repo_handler(&srv, request) {
230 Ok(car_bytes) => Response::from_data("application/octet-stream", car_bytes),
231 Err(e) => {
232 let msg = e.to_string();
233 let code = match e.downcast_ref::<XrpcError>() {
234 Some(XrpcError::BadRequest(_)) => 400,
235 Some(XrpcError::NotFound(_)) => 404,
236 Some(XrpcError::Forbidden(_)) => 403,
237 Some(XrpcError::MutexPoisoned) => std::process::exit(-1),
239 None => 500,
240 };
241 warn!("HTTP {}: {}", code, msg);
242 Response::json(&json!({ "message": msg })).with_status_code(code)
243 }
244 }
245 },
246 (GET) ["/xrpc/{endpoint}", endpoint: String] => {
247 xrpc_wrap(xrpc_get_handler(&srv, &endpoint, request))
248 },
249 _ => web_wrap(Err(XrpcError::NotFound("unknown URL pattern".to_string()).into())),
250 )
251 })
252 });
253 }
254}
255
256fn xrpc_required_param(request: &Request, key: &str) -> Result<String> {
257 Ok(request.get_param(key).ok_or(XrpcError::BadRequest(format!(
258 "require '{key}' query parameter"
259 )))?)
260}
261
262fn xrpc_check_auth_header(
264 srv: &mut AtpService,
265 request: &Request,
266 req_did: Option<&Did>,
267) -> Result<Did> {
268 let header = request
269 .header("Authorization")
270 .ok_or(XrpcError::Forbidden("require auth header".to_string()))?;
271 if !header.starts_with("Bearer ") {
272 Err(XrpcError::Forbidden("require bearer token".to_string()))?;
273 }
274 let jwt = header.split(' ').nth(1).unwrap();
275 let did = match srv.atp_db.check_auth_token(jwt)? {
276 Some(did) => did,
277 None => Err(XrpcError::Forbidden("session token not found".to_string()))?,
278 };
279 let did = Did::from_str(&did)?;
280 if req_did.is_some() && Some(&did) != req_did {
281 Err(XrpcError::Forbidden(
282 "can only modify your own repo".to_string(),
283 ))?;
284 }
285 Ok(did)
286}
287
288fn xrpc_get_handler(
289 srv: &Mutex<AtpService>,
290 method: &str,
291 request: &Request,
292) -> Result<serde_json::Value> {
293 match method {
294 "com.atproto.server.getAccountsConfig" => {
295 let srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
296 let mut avail_domains = vec![];
297 if let Some(domain) = &srv.config.registration_domain {
298 avail_domains.push(domain)
299 }
300 Ok(
302 json!({"availableUserDomains": avail_domains, "inviteCodeRequired": srv.config.invite_code.is_some()}),
303 )
304 }
305 "com.atproto.repo.getRecord" => {
306 let did = Did::from_str(&xrpc_required_param(request, "repo")?)?;
307 let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?;
308 let rkey = Tid::from_str(&xrpc_required_param(request, "rkey")?)?;
309 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
310 let key = format!("{collection}/{rkey}");
311 match srv.repo.get_atp_record(&did, &collection, &rkey) {
312 Ok(Some(ipld)) => Ok(ipld_into_json_value(ipld)),
314 Ok(None) => Err(anyhow!(XrpcError::NotFound(format!(
315 "could not find record: {key}"
316 )))),
317 Err(e) => Err(e),
318 }
319 }
320 "com.atproto.sync.getHead" => {
321 let did = Did::from_str(&xrpc_required_param(request, "repo")?)?;
322 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
323 srv.repo
324 .lookup_commit(&did)?
325 .map(|v| json!({ "root": v.to_string() }))
326 .ok_or(XrpcError::NotFound(format!("no repository found for DID: {did}")).into())
327 }
328 "com.atproto.repo.listRecords" => {
329 let did = Did::from_str(&xrpc_required_param(request, "repo")?)?;
333 let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?;
334 let mut record_list: Vec<Value> = vec![];
335 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
336 let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
337 let last_commit = srv.repo.get_commit(commit_cid)?;
338 let full_map = srv.repo.mst_to_map(&last_commit.mst_cid)?;
339 let prefix = format!("{collection}/");
340 for (mst_key, cid) in full_map.iter() {
341 if mst_key.starts_with(&prefix) {
343 let record = srv.repo.get_ipld(cid)?;
344 record_list.push(json!({
345 "uri": format!("at://{did}{mst_key}"),
346 "cid": cid.to_string(),
347 "value": ipld_into_json_value(record),
348 }));
349 }
350 }
351 Ok(json!({ "records": record_list }))
352 }
353 "com.atproto.handle.resolve" => {
354 let handle = xrpc_required_param(request, "handle")?;
355 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
356 match srv.atp_db.resolve_handle(&handle)? {
357 Some(did) => Ok(json!({"did": did.to_string()})),
358 None => Err(XrpcError::NotFound(format!(
359 "could not resolve handle internally: {handle}"
360 )))?,
361 }
362 }
363 "com.atproto.repo.describe" => {
364 let did = Did::from_str(&xrpc_required_param(request, "repo")?)?;
365
366 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
367 let did_doc = srv.atp_db.get_did_doc(&did)?;
368 let collections: Vec<String> = srv.repo.collections(&did)?;
369 let desc = com_atproto::repo::Describe {
370 handle: did.to_string(), did: did.to_string(),
372 didDoc: did_doc,
373 collections,
374 handleIsCorrect: true,
375 };
376 Ok(json!(desc))
377 }
378 "app.bsky.actor.getProfile" => {
380 let did = Did::from_str(&xrpc_required_param(request, "actor")?)?;
382 let mut srv = srv.lock().unwrap();
383 Ok(json!(bsky_get_profile(&mut srv, &did)?))
385 }
386 "app.bsky.actor.search" => {
387 let _term = xrpc_required_param(request, "term")?;
389 Ok(json!({"users": []}))
390 }
391 "app.bsky.actor.searchTypeahead" => {
392 let _term = xrpc_required_param(request, "term")?;
394 Ok(json!({"users": []}))
395 }
396 "app.bsky.actor.getSuggestions" => {
397 Ok(json!({"actors": []}))
399 }
400 "app.bsky.feed.getAuthorFeed" => {
401 let did = Did::from_str(&xrpc_required_param(request, "actor")?)?;
403 let mut srv = srv.lock().unwrap();
404 Ok(json!(bsky_get_author_feed(&mut srv, &did)?))
405 }
406 "app.bsky.feed.getTimeline" => {
407 let mut srv = srv.lock().unwrap();
408 let auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
409 Ok(json!(bsky_get_timeline(&mut srv, auth_did)?))
410 }
411 "app.bsky.feed.getPostThread" => {
412 let uri = AtUri::from_str(&xrpc_required_param(request, "uri")?)?;
413 let mut srv = srv.lock().unwrap();
414 Ok(json!(bsky_get_thread(&mut srv, &uri, None)?))
415 }
416 "app.bsky.graph.getMemberships" => {
417 let _actor = Did::from_str(&xrpc_required_param(request, "actor")?)?;
420 Ok(json!({"memberships": []}))
421 }
422 "app.bsky.notification.getCount" => {
423 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
425 let _auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
426 Ok(json!({"count": 0}))
427 }
428 "app.bsky.notification.listNotifications" => {
429 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
431 let _auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
432 Ok(json!({"notifications": []}))
433 }
434 _ => Err(anyhow!(XrpcError::NotFound(format!(
435 "XRPC endpoint handler not found: {method}"
436 )))),
437 }
438}
439
440fn xrpc_get_repo_handler(srv: &Mutex<AtpService>, request: &Request) -> Result<Vec<u8>> {
441 let did = Did::from_str(&xrpc_required_param(request, "repo")?)?;
442 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
443 let commit_cid = srv.repo.lookup_commit(&did)?.unwrap();
445 srv.repo.export_car(&commit_cid, None)
446}
447
448pub fn create_account(
449 srv: &mut AtpService,
450 req: &com_atproto::AccountRequest,
451 create_did_plc: bool,
452) -> Result<com_atproto::Session> {
453 if srv.atp_db.account_exists(&req.handle, &req.email)? {
455 Err(XrpcError::BadRequest(
456 "handle or email already exists".to_string(),
457 ))?;
458 };
459
460 debug!("trying to create new account: {}", &req.handle);
461
462 let (did, did_doc) = if create_did_plc {
463 let create_op = plc::CreateOp::new(
465 req.handle.clone(),
466 srv.config.public_url.clone(),
467 &srv.pds_keypair,
468 req.recoveryKey.clone(),
469 );
470 create_op.verify_self()?;
471 let did = create_op.did_plc();
472 let did_doc = create_op.did_doc();
473 (did, did_doc)
474 } else {
475 let did = Did::from_str(&format!("did:web:{}", req.handle))?;
476 let signing_key = srv.pds_keypair.pubkey().to_did_key();
477 let recovery_key = req.recoveryKey.clone().unwrap_or(signing_key.clone());
478 let meta = DidDocMeta {
479 did: did.clone(),
480 user_url: format!("https://{}", req.handle),
481 service_url: srv.config.public_url.clone(),
482 recovery_didkey: recovery_key,
483 signing_didkey: signing_key,
484 };
485 (did, meta.did_doc())
486 };
487
488 let recovery_key = req
490 .recoveryKey
491 .clone()
492 .unwrap_or(srv.pds_keypair.pubkey().to_did_key());
493 srv.atp_db
494 .create_account(&did, &req.handle, &req.password, &req.email, &recovery_key)?;
495 srv.atp_db.put_did_doc(&did, &did_doc)?;
496
497 let keypair = srv.pds_keypair.clone();
499 let empty_map_cid = srv.repo.mst_from_map(&Default::default())?;
500 let _commit_cid = srv.repo.write_commit(&did, None, empty_map_cid, &keypair)?;
501
502 let sess = srv
503 .atp_db
504 .create_session(&req.handle, &req.password, &keypair)?;
505 Ok(sess)
506}
507
508fn xrpc_post_handler(
509 srv: &Mutex<AtpService>,
510 method: &str,
511 request: &Request,
512) -> Result<serde_json::Value> {
513 match method {
514 "com.atproto.server.createAccount" => {
515 let req: com_atproto::AccountRequest = rouille::input::json_input(request)
517 .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {e}")))?;
518 let mut srv = srv.lock().unwrap();
520 if let Some(ref domain) = srv.config.registration_domain {
521 if !req.handle.ends_with(domain) {
523 Err(XrpcError::BadRequest(format!(
524 "handle is not under registration domain ({domain})"
525 )))?;
526 }
527 } else {
528 Err(XrpcError::BadRequest(
529 "account registration is disabled on this PDS".to_string(),
530 ))?;
531 };
532 if srv.config.invite_code.is_some() && srv.config.invite_code != req.inviteCode {
533 Err(XrpcError::Forbidden(
534 "a valid invite code is required".to_string(),
535 ))?;
536 };
537 let sess = create_account(&mut srv, &req, true)?;
538 Ok(json!(sess))
539 }
540 "com.atproto.server.createSession" => {
541 let req: com_atproto::SessionRequest = rouille::input::json_input(request)
542 .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {e}")))?;
543 let mut srv = srv.lock().unwrap();
544 let keypair = srv.pds_keypair.clone();
545 Ok(json!(srv.atp_db.create_session(
546 &req.identifier,
547 &req.password,
548 &keypair
549 )?))
550 }
551 "com.atproto.server.refreshSession" => {
552 let mut srv = srv.lock().unwrap();
554 let did = xrpc_check_auth_header(&mut srv, request, None)?;
555 let header = request
556 .header("Authorization")
557 .ok_or(XrpcError::Forbidden("require auth header".to_string()))?;
558 if !header.starts_with("Bearer ") {
559 Err(XrpcError::Forbidden("require bearer token".to_string()))?;
560 }
561 let jwt = header.split(' ').nth(1).unwrap();
562 let handle = srv
563 .atp_db
564 .resolve_did(&did)?
565 .expect("DID matches to a handle");
566
567 Ok(json!(com_atproto::Session {
568 did: did.to_string(),
569 name: handle,
570 email: None,
571 accessJwt: jwt.to_string(),
572 refreshJwt: jwt.to_string(),
573 }))
574 }
575 "com.atproto.server.deleteSession" => {
576 let mut srv = srv.lock().unwrap();
577 let _did = xrpc_check_auth_header(&mut srv, request, None)?;
578 let header = request
579 .header("Authorization")
580 .ok_or(XrpcError::Forbidden("require auth header".to_string()))?;
581 if !header.starts_with("Bearer ") {
582 Err(XrpcError::Forbidden("require bearer token".to_string()))?;
583 }
584 let jwt = header.split(' ').nth(1).expect("JWT in header");
585 if !srv.atp_db.delete_session(jwt)? {
586 Err(anyhow!(
587 "session token not found, even after using for auth"
588 ))?
589 };
590 Ok(json!({}))
591 }
592 "com.atproto.repo.applyWrites" => {
593 let batch: com_atproto::repo::BatchWriteBody = rouille::input::json_input(request)?;
594 let did = Did::from_str(&batch.repo)?;
596 let mut srv = srv.lock().unwrap();
597 let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
598 let mut mutations: Vec<Mutation> = Default::default();
599 for w in batch.writes.iter() {
600 let m = match w.op_type.as_str() {
601 "create" => Mutation::Create(
602 Nsid::from_str(&w.collection)?,
603 w.rkey
605 .as_ref()
606 .map(|t| Tid::from_str(t).unwrap())
607 .unwrap_or_else(|| srv.tid_gen.next_tid()),
608 json_value_into_ipld(w.value.clone().unwrap()),
609 ),
610 "update" => Mutation::Update(
611 Nsid::from_str(&w.collection)?,
612 Tid::from_str(w.rkey.as_ref().unwrap())?,
613 json_value_into_ipld(w.value.clone().unwrap()),
614 ),
615 "delete" => Mutation::Delete(
616 Nsid::from_str(&w.collection)?,
617 Tid::from_str(w.rkey.as_ref().unwrap())?,
618 ),
619 _ => Err(anyhow!("unhandled operation type: {}", w.op_type))?,
620 };
621 mutations.push(m);
622 }
623 let keypair = srv.pds_keypair.clone();
624 srv.repo.mutate_repo(&did, &mutations, &keypair)?;
625 bsky_mutate_db(&mut srv.atp_db, &did, mutations)?;
626 Ok(json!({}))
627 }
628 "com.atproto.repo.createRecord" => {
629 let create: com_atproto::repo::CreateRecord = rouille::input::json_input(request)?;
631 let did = Did::from_str(&create.did)?;
632 let collection = Nsid::from_str(&create.collection)?;
633 let mut srv = srv.lock().unwrap();
634 let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
635 let mutations: Vec<Mutation> = vec![Mutation::Create(
636 collection,
637 srv.tid_gen.next_tid(),
638 json_value_into_ipld(create.record),
639 )];
640 let keypair = srv.pds_keypair.clone();
641 srv.repo.mutate_repo(&did, &mutations, &keypair)?;
642 bsky_mutate_db(&mut srv.atp_db, &did, mutations)?;
643 Ok(json!({}))
644 }
645 "com.atproto.repo.putRecord" => {
646 let put: com_atproto::repo::PutRecord = rouille::input::json_input(request)?;
648 let did = Did::from_str(&put.did)?;
649 let collection = Nsid::from_str(&put.collection)?;
650 let tid = Tid::from_str(&put.rkey)?;
651 let mut srv = srv.lock().unwrap();
652 let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
653
654 let mutations: Vec<Mutation> = vec![Mutation::Update(
655 collection,
656 tid,
657 json_value_into_ipld(put.record),
658 )];
659 let keypair = srv.pds_keypair.clone();
660 srv.repo.mutate_repo(&did, &mutations, &keypair)?;
661 bsky_mutate_db(&mut srv.atp_db, &did, mutations)?;
662 Ok(json!({}))
663 }
664 "com.atproto.repo.deleteRecord" => {
665 let delete: com_atproto::repo::DeleteRecord = rouille::input::json_input(request)?;
666 let did = Did::from_str(&delete.did)?;
667 let collection = Nsid::from_str(&delete.collection)?;
668 let tid = Tid::from_str(&delete.rkey)?;
669 let mut srv = srv.lock().unwrap();
670 let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
671
672 let mutations: Vec<Mutation> = vec![Mutation::Delete(collection, tid)];
673 let keypair = srv.pds_keypair.clone();
674 srv.repo.mutate_repo(&did, &mutations, &keypair)?;
675 bsky_mutate_db(&mut srv.atp_db, &did, mutations)?;
676 Ok(json!({}))
677 }
678 "com.atproto.sync.updateRepo" => {
679 let did = Did::from_str(&xrpc_required_param(request, "repo")?)?;
681 let mut car_bytes: Vec<u8> = Default::default();
683 request.data().unwrap().read_to_end(&mut car_bytes)?;
685 let mut srv = srv.lock().unwrap();
686 let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
687 srv.repo
688 .import_car_bytes(&car_bytes, Some(did.to_string()))?;
689 Ok(json!({}))
691 }
692 "app.bsky.actor.updateProfile" => {
694 let profile: app_bsky::ProfileRecord = rouille::input::json_input(request)?;
695 let mut srv = srv.lock().unwrap();
696 let auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
697 bsky_update_profile(&mut srv, auth_did, profile)?;
698 Ok(json!({}))
699 }
700 "app.bsky.notification.updateSeen" => {
701 let mut srv = srv.lock().unwrap();
703 let _auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
704 Ok(json!({}))
705 }
706 _ => Err(anyhow!(XrpcError::NotFound(format!(
707 "XRPC endpoint handler not found: {method}"
708 )))),
709 }
710}
711
712fn home_view_handler(srv: &Mutex<AtpService>, request: &Request) -> Result<String> {
713 let host = request.header("Host").unwrap_or("localhost");
714
715 let did: Option<Did> = {
717 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
719 srv.atp_db.resolve_handle(host)?
720 };
721 if did.is_some() {
722 account_view_handler(srv, host, request)
723 } else {
724 let view = GenericHomeView {
725 domain: host.to_string(),
726 };
727 Ok(view.render()?)
728 }
729}
730
731fn did_doc_view_handler(srv: &Mutex<AtpService>, request: &Request) -> Result<Response> {
732 let host = request.header("Host").unwrap_or("localhost");
733 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
734 if let Some(did) = srv.atp_db.resolve_handle(host)? {
735 if did.to_string().starts_with("did:web:") {
736 let did_doc = srv.atp_db.get_did_doc(&did)?;
737 return Ok(Response::json(&did_doc));
738 }
739 };
740 Err(XrpcError::NotFound(
741 "no did:web: account registered at this domain".to_string(),
742 ))?
743}
744
745fn account_view_handler(
747 srv: &Mutex<AtpService>,
748 handle: &str,
749 request: &Request,
750) -> Result<String> {
751 let host = request.header("Host").unwrap_or("localhost");
752 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
753 let did = srv
755 .atp_db
756 .resolve_handle(handle)?
757 .ok_or(XrpcError::NotFound(format!(
758 "no DID found for handle: {handle}"
759 )))?;
760
761 Ok(AccountView {
762 domain: host.to_string(),
763 did: did.clone(),
764 profile: bsky_get_profile_detailed(&mut srv, &did)?,
765 feed: bsky_get_author_feed(&mut srv, &did)?.feed,
766 }
767 .render()?)
768}
769
770fn thread_view_handler(
771 srv: &Mutex<AtpService>,
772 handle: &str,
773 tid: &Tid,
774 request: &Request,
775) -> Result<String> {
776 let host = request.header("Host").unwrap_or("localhost");
777 let collection = Nsid::from_str("app.bsky.feed.post")?;
778 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
779 let did = srv.atp_db.resolve_handle(handle)?.unwrap();
781
782 let uri = AtUri::from_str(&format!("at://{did}/{collection}/{tid}"))?;
784 Ok(ThreadView {
785 domain: host.to_string(),
786 did,
787 collection,
788 tid: tid.clone(),
789 post: bsky_get_thread(&mut srv, &uri, None)?.thread,
790 }
791 .render()?)
792}
793
794fn repo_view_handler(srv: &Mutex<AtpService>, did: &str, request: &Request) -> Result<String> {
795 let host = request.header("Host").unwrap_or("localhost");
796 let did = Did::from_str(did)?;
797
798 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
799 let did_doc = srv.atp_db.get_did_doc(&did)?;
800 let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
801 let commit = srv.repo.get_commit(commit_cid)?;
802 let collections: Vec<String> = srv.repo.collections(&did)?;
803 let desc = com_atproto::repo::Describe {
804 handle: did.to_string(), did: did.to_string(),
806 didDoc: did_doc,
807 collections,
808 handleIsCorrect: true,
809 };
810
811 Ok(RepoView {
812 domain: host.to_string(),
813 did,
814 commit,
815 describe: desc,
816 }
817 .render()?)
818}
819
820fn collection_view_handler(
821 srv: &Mutex<AtpService>,
822 did: &str,
823 collection: &str,
824 request: &Request,
825) -> Result<String> {
826 let host = request.header("Host").unwrap_or("localhost");
827 let did = Did::from_str(did)?;
828 let collection = Nsid::from_str(collection)?;
829
830 let mut record_list: Vec<Value> = vec![];
831 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
832 let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
833 let last_commit = srv.repo.get_commit(commit_cid)?;
834 let full_map = srv.repo.mst_to_map(&last_commit.mst_cid)?;
835 let prefix = format!("{collection}/");
836 for (mst_key, cid) in full_map.iter() {
837 debug!("{}", mst_key);
838 if mst_key.starts_with(&prefix) {
839 let record = srv.repo.get_ipld(cid)?;
840 record_list.push(json!({
841 "uri": format!("at://{did}{mst_key}"),
842 "tid": mst_key.split('/').nth(2).unwrap(),
843 "cid": cid,
844 "value": ipld_into_json_value(record),
845 }));
846 }
847 }
848
849 Ok(CollectionView {
850 domain: host.to_string(),
851 did,
852 collection,
853 records: record_list,
854 }
855 .render()?)
856}
857
858fn record_view_handler(
859 srv: &Mutex<AtpService>,
860 did: &str,
861 collection: &str,
862 tid: &str,
863 request: &Request,
864) -> Result<String> {
865 let host = request.header("Host").unwrap_or("localhost");
866 let did = Did::from_str(did)?;
867 let collection = Nsid::from_str(collection)?;
868 let rkey = Tid::from_str(tid)?;
869
870 let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
871 let key = format!("{collection}/{rkey}");
872 let record = match srv.repo.get_atp_record(&did, &collection, &rkey) {
873 Ok(Some(ipld)) => ipld_into_json_value(ipld),
874 Ok(None) => Err(anyhow!(XrpcError::NotFound(format!(
875 "could not find record: {key}"
876 ))))?,
877 Err(e) => Err(e)?,
878 };
879 Ok(RecordView {
880 domain: host.to_string(),
881 did,
882 collection,
883 tid: rkey,
884 record,
885 }
886 .render()?)
887}