adenosine_pds/
lib.rs

1use adenosine::created_at_now;
2use adenosine::identifiers::{AtUri, Did, Nsid, Ticker, Tid};
3use anyhow::{anyhow, Result};
4use askama::Template;
5use log::{debug, error, info, warn};
6use rouille::{router, Request, Response};
7use serde_json::{json, Value};
8use std::fmt;
9use std::io::Read;
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::sync::Mutex;
13
14mod db;
15mod db_bsky;
16mod web;
17
18use adenosine::app_bsky;
19use adenosine::com_atproto;
20use adenosine::crypto::KeyPair;
21use adenosine::ipld::{ipld_into_json_value, json_value_into_ipld};
22use adenosine::plc;
23use adenosine::plc::DidDocMeta;
24use adenosine::repo::{Mutation, RepoStore};
25use db::AtpDatabase;
26use db_bsky::*;
27use web::*;
28
29#[derive(Debug)]
30pub enum XrpcError {
31    BadRequest(String),
32    NotFound(String),
33    Forbidden(String),
34    MutexPoisoned,
35}
36
37impl std::error::Error for XrpcError {}
38
39impl fmt::Display for XrpcError {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        match self {
42            Self::BadRequest(msg) | Self::NotFound(msg) | Self::Forbidden(msg) => {
43                write!(f, "{msg}")
44            }
45            Self::MutexPoisoned => write!(f, "service mutex poisoned"),
46        }
47    }
48}
49
50pub struct AtpService {
51    pub repo: RepoStore,
52    pub atp_db: AtpDatabase,
53    pub pds_keypair: KeyPair,
54    pub tid_gen: Ticker,
55    pub config: AtpServiceConfig,
56}
57
58#[derive(Clone, Debug)]
59pub struct AtpServiceConfig {
60    pub listen_host_port: String,
61    pub public_url: String,
62    pub registration_domain: Option<String>,
63    pub invite_code: Option<String>,
64    pub homepage_handle: Option<String>,
65}
66
67impl Default for AtpServiceConfig {
68    fn default() -> Self {
69        AtpServiceConfig {
70            listen_host_port: "localhost:3030".to_string(),
71            public_url: "http://localhost".to_string(),
72            registration_domain: None,
73            invite_code: None,
74            homepage_handle: None,
75        }
76    }
77}
78
79/// Helper to take an XRPC result (always a JSON object), and transform it to a rouille response
80fn xrpc_wrap<S: serde::Serialize>(resp: Result<S>) -> Response {
81    match resp {
82        Ok(val) => Response::json(&val),
83        Err(e) => {
84            let msg = e.to_string();
85            let code = match e.downcast_ref::<XrpcError>() {
86                Some(XrpcError::BadRequest(_)) => 400,
87                Some(XrpcError::NotFound(_)) => 404,
88                Some(XrpcError::Forbidden(_)) => 403,
89                // crash hard on mutex poison error
90                Some(XrpcError::MutexPoisoned) => std::process::exit(-1),
91                None => 500,
92            };
93            warn!("HTTP {}: {}", code, msg);
94            Response::json(&json!({ "message": msg })).with_status_code(code)
95        }
96    }
97}
98
99/// Helper to take a Askama render Result and transform it to a rouille response, including more
100/// friendly HTML 404s, etc.
101fn web_wrap(resp: Result<String>) -> Response {
102    match resp {
103        Ok(val) => Response::html(val),
104        Err(e) => {
105            let msg = e.to_string();
106            let code = match e.downcast_ref::<XrpcError>() {
107                Some(XrpcError::BadRequest(_)) => 400,
108                Some(XrpcError::NotFound(_)) => 404,
109                Some(XrpcError::Forbidden(_)) => 403,
110                // crash hard on mutex poison error
111                Some(XrpcError::MutexPoisoned) => std::process::exit(-1),
112                None => 500,
113            };
114            warn!("HTTP {}: {}", code, msg);
115            let view = ErrorView {
116                domain: "ERROR".to_string(),
117                status_code: code,
118                error_message: msg,
119            };
120            Response::html(view.render().unwrap())
121        }
122    }
123}
124
125impl AtpService {
126    pub fn new(
127        blockstore_db_path: &PathBuf,
128        atp_db_path: &PathBuf,
129        keypair: KeyPair,
130        config: AtpServiceConfig,
131    ) -> Result<Self> {
132        Ok(AtpService {
133            repo: RepoStore::open(blockstore_db_path)?,
134            atp_db: AtpDatabase::open(atp_db_path)?,
135            pds_keypair: keypair,
136            tid_gen: Ticker::new(),
137            config,
138        })
139    }
140
141    pub fn new_ephemeral() -> Result<Self> {
142        Ok(AtpService {
143            repo: RepoStore::open_ephemeral()?,
144            atp_db: AtpDatabase::open_ephemeral()?,
145            pds_keypair: KeyPair::new_random(),
146            tid_gen: Ticker::new(),
147            config: AtpServiceConfig::default(),
148        })
149    }
150
151    pub fn run_server(self) -> Result<()> {
152        let config = self.config.clone();
153        let srv = Mutex::new(self);
154
155        let log_ok = |req: &Request, resp: &Response, elap: std::time::Duration| {
156            info!(
157                "{} {} ({}, {:?})",
158                req.method(),
159                req.raw_url(),
160                resp.status_code,
161                elap
162            );
163        };
164        let log_err = |req: &Request, elap: std::time::Duration| {
165            error!(
166                "HTTP handler panicked: {} {} ({:?})",
167                req.method(),
168                req.raw_url(),
169                elap
170            );
171        };
172
173        rouille::start_server(config.listen_host_port, move |request| {
174            rouille::log_custom(request, log_ok, log_err, || {
175                router!(request,
176                    // ============= Web Interface
177                    (GET) ["/"] => {
178                        if let Some(ref handle) = config.homepage_handle {
179                            web_wrap(account_view_handler(&srv, handle, request))
180                        } else {
181                            web_wrap(home_view_handler(&srv, request))
182                        }
183                    },
184                    (GET) ["/.well-known/did.json"] => {
185                        match did_doc_view_handler(&srv, request) {
186                            Ok(resp) => resp,
187                            Err(e) => web_wrap(Err(e)),
188                        }
189                    },
190                    (GET) ["/about"] => {
191                        let host = request.header("Host").unwrap_or("localhost");
192                        let view = AboutView { domain: host.to_string() };
193                        Response::html(view.render().unwrap())
194                    },
195                    (GET) ["/u/{handle}", handle: String] => {
196                        web_wrap(account_view_handler(&srv, &handle, request))
197                    },
198                    (GET) ["/u/{handle}/post/{tid}", handle: String, tid: Tid] => {
199                        web_wrap(thread_view_handler(&srv, &handle, &tid, request))
200                    },
201                    (GET) ["/at/{did}", did: Did] => {
202                        web_wrap(repo_view_handler(&srv, &did, request))
203                    },
204                    (GET) ["/at/{did}/{collection}", did: Did, collection: Nsid] => {
205                        web_wrap(collection_view_handler(&srv, &did, &collection, request))
206                    },
207                    (GET) ["/at/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => {
208                        web_wrap(record_view_handler(&srv, &did, &collection, &tid, request))
209                    },
210                    // ============ Static Files (compiled in to executable)
211                    (GET) ["/static/adenosine.css"] => {
212                        Response::from_data("text/css", include_str!("../templates/adenosine.css"))
213                    },
214                    (GET) ["/static/favicon.png"] => {
215                        Response::from_data("image/png", include_bytes!("../templates/favicon.png").to_vec())
216                    },
217                    (GET) ["/static/logo_128.png"] => {
218                        Response::from_data("image/png", include_bytes!("../templates/logo_128.png").to_vec())
219                    },
220                    (GET) ["/robots.txt"] => {
221                        Response::text(include_str!("../templates/robots.txt"))
222                    },
223                    // ============ XRPC AT Protocol
224                    (POST) ["/xrpc/{endpoint}", endpoint: String] => {
225                        xrpc_wrap(xrpc_post_handler(&srv, &endpoint, request))
226                    },
227                    (GET) ["/xrpc/com.atproto.sync.getRepo"] => {
228                        // this one endpoint returns CAR file, not JSON, so wrappers don't work
229                        match xrpc_get_repo_handler(&srv, request) {
230                            Ok(car_bytes) => Response::from_data("application/octet-stream", car_bytes),
231                            Err(e) => {
232                                let msg = e.to_string();
233                                let code = match e.downcast_ref::<XrpcError>() {
234                                    Some(XrpcError::BadRequest(_)) => 400,
235                                    Some(XrpcError::NotFound(_)) => 404,
236                                    Some(XrpcError::Forbidden(_)) => 403,
237                                    // crash hard on mutex poison error
238                                    Some(XrpcError::MutexPoisoned) => std::process::exit(-1),
239                                    None => 500,
240                                };
241                                warn!("HTTP {}: {}", code, msg);
242                                Response::json(&json!({ "message": msg })).with_status_code(code)
243                            }
244                        }
245                    },
246                    (GET) ["/xrpc/{endpoint}", endpoint: String] => {
247                        xrpc_wrap(xrpc_get_handler(&srv, &endpoint, request))
248                    },
249                    _ => web_wrap(Err(XrpcError::NotFound("unknown URL pattern".to_string()).into())),
250                )
251            })
252        });
253    }
254}
255
256fn xrpc_required_param(request: &Request, key: &str) -> Result<String> {
257    Ok(request.get_param(key).ok_or(XrpcError::BadRequest(format!(
258        "require '{key}' query parameter"
259    )))?)
260}
261
262/// Returns DID of validated user
263fn xrpc_check_auth_header(
264    srv: &mut AtpService,
265    request: &Request,
266    req_did: Option<&Did>,
267) -> Result<Did> {
268    let header = request
269        .header("Authorization")
270        .ok_or(XrpcError::Forbidden("require auth header".to_string()))?;
271    if !header.starts_with("Bearer ") {
272        Err(XrpcError::Forbidden("require bearer token".to_string()))?;
273    }
274    let jwt = header.split(' ').nth(1).unwrap();
275    let did = match srv.atp_db.check_auth_token(jwt)? {
276        Some(did) => did,
277        None => Err(XrpcError::Forbidden("session token not found".to_string()))?,
278    };
279    let did = Did::from_str(&did)?;
280    if req_did.is_some() && Some(&did) != req_did {
281        Err(XrpcError::Forbidden(
282            "can only modify your own repo".to_string(),
283        ))?;
284    }
285    Ok(did)
286}
287
288fn xrpc_get_handler(
289    srv: &Mutex<AtpService>,
290    method: &str,
291    request: &Request,
292) -> Result<serde_json::Value> {
293    match method {
294        "com.atproto.server.getAccountsConfig" => {
295            let srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
296            let mut avail_domains = vec![];
297            if let Some(domain) = &srv.config.registration_domain {
298                avail_domains.push(domain)
299            }
300            // TODO: optional "links" object with "privacyPolicy" and "termsOfService" URLs
301            Ok(
302                json!({"availableUserDomains": avail_domains, "inviteCodeRequired": srv.config.invite_code.is_some()}),
303            )
304        }
305        "com.atproto.repo.getRecord" => {
306            let did = Did::from_str(&xrpc_required_param(request, "repo")?)?;
307            let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?;
308            let rkey = Tid::from_str(&xrpc_required_param(request, "rkey")?)?;
309            let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
310            let key = format!("{collection}/{rkey}");
311            match srv.repo.get_atp_record(&did, &collection, &rkey) {
312                // TODO: format as JSON, not text debug
313                Ok(Some(ipld)) => Ok(ipld_into_json_value(ipld)),
314                Ok(None) => Err(anyhow!(XrpcError::NotFound(format!(
315                    "could not find record: {key}"
316                )))),
317                Err(e) => Err(e),
318            }
319        }
320        "com.atproto.sync.getHead" => {
321            let did = Did::from_str(&xrpc_required_param(request, "repo")?)?;
322            let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
323            srv.repo
324                .lookup_commit(&did)?
325                .map(|v| json!({ "root": v.to_string() }))
326                .ok_or(XrpcError::NotFound(format!("no repository found for DID: {did}")).into())
327        }
328        "com.atproto.repo.listRecords" => {
329            // TODO: limit, before, after, tid, reverse
330            // TODO: handle non-DID 'user'
331            // TODO: limit result set size
332            let did = Did::from_str(&xrpc_required_param(request, "repo")?)?;
333            let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?;
334            let mut record_list: Vec<Value> = vec![];
335            let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
336            let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
337            let last_commit = srv.repo.get_commit(commit_cid)?;
338            let full_map = srv.repo.mst_to_map(&last_commit.mst_cid)?;
339            let prefix = format!("{collection}/");
340            for (mst_key, cid) in full_map.iter() {
341                //debug!("{}", mst_key);
342                if mst_key.starts_with(&prefix) {
343                    let record = srv.repo.get_ipld(cid)?;
344                    record_list.push(json!({
345                        "uri": format!("at://{did}{mst_key}"),
346                        "cid": cid.to_string(),
347                        "value": ipld_into_json_value(record),
348                    }));
349                }
350            }
351            Ok(json!({ "records": record_list }))
352        }
353        "com.atproto.handle.resolve" => {
354            let handle = xrpc_required_param(request, "handle")?;
355            let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
356            match srv.atp_db.resolve_handle(&handle)? {
357                Some(did) => Ok(json!({"did": did.to_string()})),
358                None => Err(XrpcError::NotFound(format!(
359                    "could not resolve handle internally: {handle}"
360                )))?,
361            }
362        }
363        "com.atproto.repo.describe" => {
364            let did = Did::from_str(&xrpc_required_param(request, "repo")?)?;
365
366            let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
367            let did_doc = srv.atp_db.get_did_doc(&did)?;
368            let collections: Vec<String> = srv.repo.collections(&did)?;
369            let desc = com_atproto::repo::Describe {
370                handle: did.to_string(), // TODO: handle?
371                did: did.to_string(),
372                didDoc: did_doc,
373                collections,
374                handleIsCorrect: true,
375            };
376            Ok(json!(desc))
377        }
378        // =========== app.bsky methods
379        "app.bsky.actor.getProfile" => {
380            // TODO did or handle
381            let did = Did::from_str(&xrpc_required_param(request, "actor")?)?;
382            let mut srv = srv.lock().unwrap();
383            // TODO: if profile doesn't exist, return a 404
384            Ok(json!(bsky_get_profile(&mut srv, &did)?))
385        }
386        "app.bsky.actor.search" => {
387            // TODO: actual implementation
388            let _term = xrpc_required_param(request, "term")?;
389            Ok(json!({"users": []}))
390        }
391        "app.bsky.actor.searchTypeahead" => {
392            // TODO: actual implementation
393            let _term = xrpc_required_param(request, "term")?;
394            Ok(json!({"users": []}))
395        }
396        "app.bsky.actor.getSuggestions" => {
397            // TODO: actual implementation
398            Ok(json!({"actors": []}))
399        }
400        "app.bsky.feed.getAuthorFeed" => {
401            // TODO did or handle
402            let did = Did::from_str(&xrpc_required_param(request, "actor")?)?;
403            let mut srv = srv.lock().unwrap();
404            Ok(json!(bsky_get_author_feed(&mut srv, &did)?))
405        }
406        "app.bsky.feed.getTimeline" => {
407            let mut srv = srv.lock().unwrap();
408            let auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
409            Ok(json!(bsky_get_timeline(&mut srv, auth_did)?))
410        }
411        "app.bsky.feed.getPostThread" => {
412            let uri = AtUri::from_str(&xrpc_required_param(request, "uri")?)?;
413            let mut srv = srv.lock().unwrap();
414            Ok(json!(bsky_get_thread(&mut srv, &uri, None)?))
415        }
416        "app.bsky.graph.getMemberships" => {
417            // TODO: actual implementation
418            // TODO did or handle
419            let _actor = Did::from_str(&xrpc_required_param(request, "actor")?)?;
420            Ok(json!({"memberships": []}))
421        }
422        "app.bsky.notification.getCount" => {
423            // TODO: actual implementation
424            let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
425            let _auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
426            Ok(json!({"count": 0}))
427        }
428        "app.bsky.notification.listNotifications" => {
429            // TODO: actual implementation
430            let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
431            let _auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
432            Ok(json!({"notifications": []}))
433        }
434        _ => Err(anyhow!(XrpcError::NotFound(format!(
435            "XRPC endpoint handler not found: {method}"
436        )))),
437    }
438}
439
440fn xrpc_get_repo_handler(srv: &Mutex<AtpService>, request: &Request) -> Result<Vec<u8>> {
441    let did = Did::from_str(&xrpc_required_param(request, "repo")?)?;
442    let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
443    // TODO: don't unwrap here
444    let commit_cid = srv.repo.lookup_commit(&did)?.unwrap();
445    srv.repo.export_car(&commit_cid, None)
446}
447
448pub fn create_account(
449    srv: &mut AtpService,
450    req: &com_atproto::AccountRequest,
451    create_did_plc: bool,
452) -> Result<com_atproto::Session> {
453    // check if account already exists (fast path, also confirmed by database schema)
454    if srv.atp_db.account_exists(&req.handle, &req.email)? {
455        Err(XrpcError::BadRequest(
456            "handle or email already exists".to_string(),
457        ))?;
458    };
459
460    debug!("trying to create new account: {}", &req.handle);
461
462    let (did, did_doc) = if create_did_plc {
463        // generate DID
464        let create_op = plc::CreateOp::new(
465            req.handle.clone(),
466            srv.config.public_url.clone(),
467            &srv.pds_keypair,
468            req.recoveryKey.clone(),
469        );
470        create_op.verify_self()?;
471        let did = create_op.did_plc();
472        let did_doc = create_op.did_doc();
473        (did, did_doc)
474    } else {
475        let did = Did::from_str(&format!("did:web:{}", req.handle))?;
476        let signing_key = srv.pds_keypair.pubkey().to_did_key();
477        let recovery_key = req.recoveryKey.clone().unwrap_or(signing_key.clone());
478        let meta = DidDocMeta {
479            did: did.clone(),
480            user_url: format!("https://{}", req.handle),
481            service_url: srv.config.public_url.clone(),
482            recovery_didkey: recovery_key,
483            signing_didkey: signing_key,
484        };
485        (did, meta.did_doc())
486    };
487
488    // register in ATP DB and generate DID doc
489    let recovery_key = req
490        .recoveryKey
491        .clone()
492        .unwrap_or(srv.pds_keypair.pubkey().to_did_key());
493    srv.atp_db
494        .create_account(&did, &req.handle, &req.password, &req.email, &recovery_key)?;
495    srv.atp_db.put_did_doc(&did, &did_doc)?;
496
497    // insert empty MST repository
498    let keypair = srv.pds_keypair.clone();
499    let empty_map_cid = srv.repo.mst_from_map(&Default::default())?;
500    let _commit_cid = srv.repo.write_commit(&did, None, empty_map_cid, &keypair)?;
501
502    let sess = srv
503        .atp_db
504        .create_session(&req.handle, &req.password, &keypair)?;
505    Ok(sess)
506}
507
508fn xrpc_post_handler(
509    srv: &Mutex<AtpService>,
510    method: &str,
511    request: &Request,
512) -> Result<serde_json::Value> {
513    match method {
514        "com.atproto.server.createAccount" => {
515            // validate account request
516            let req: com_atproto::AccountRequest = rouille::input::json_input(request)
517                .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {e}")))?;
518            // TODO: validate handle, email, recoverykey
519            let mut srv = srv.lock().unwrap();
520            if let Some(ref domain) = srv.config.registration_domain {
521                // TODO: better matching, should not allow arbitrary sub-domains
522                if !req.handle.ends_with(domain) {
523                    Err(XrpcError::BadRequest(format!(
524                        "handle is not under registration domain ({domain})"
525                    )))?;
526                }
527            } else {
528                Err(XrpcError::BadRequest(
529                    "account registration is disabled on this PDS".to_string(),
530                ))?;
531            };
532            if srv.config.invite_code.is_some() && srv.config.invite_code != req.inviteCode {
533                Err(XrpcError::Forbidden(
534                    "a valid invite code is required".to_string(),
535                ))?;
536            };
537            let sess = create_account(&mut srv, &req, true)?;
538            Ok(json!(sess))
539        }
540        "com.atproto.server.createSession" => {
541            let req: com_atproto::SessionRequest = rouille::input::json_input(request)
542                .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {e}")))?;
543            let mut srv = srv.lock().unwrap();
544            let keypair = srv.pds_keypair.clone();
545            Ok(json!(srv.atp_db.create_session(
546                &req.identifier,
547                &req.password,
548                &keypair
549            )?))
550        }
551        "com.atproto.server.refreshSession" => {
552            // actually just returns current session, because we don't implement refresh
553            let mut srv = srv.lock().unwrap();
554            let did = xrpc_check_auth_header(&mut srv, request, None)?;
555            let header = request
556                .header("Authorization")
557                .ok_or(XrpcError::Forbidden("require auth header".to_string()))?;
558            if !header.starts_with("Bearer ") {
559                Err(XrpcError::Forbidden("require bearer token".to_string()))?;
560            }
561            let jwt = header.split(' ').nth(1).unwrap();
562            let handle = srv
563                .atp_db
564                .resolve_did(&did)?
565                .expect("DID matches to a handle");
566
567            Ok(json!(com_atproto::Session {
568                did: did.to_string(),
569                name: handle,
570                email: None,
571                accessJwt: jwt.to_string(),
572                refreshJwt: jwt.to_string(),
573            }))
574        }
575        "com.atproto.server.deleteSession" => {
576            let mut srv = srv.lock().unwrap();
577            let _did = xrpc_check_auth_header(&mut srv, request, None)?;
578            let header = request
579                .header("Authorization")
580                .ok_or(XrpcError::Forbidden("require auth header".to_string()))?;
581            if !header.starts_with("Bearer ") {
582                Err(XrpcError::Forbidden("require bearer token".to_string()))?;
583            }
584            let jwt = header.split(' ').nth(1).expect("JWT in header");
585            if !srv.atp_db.delete_session(jwt)? {
586                Err(anyhow!(
587                    "session token not found, even after using for auth"
588                ))?
589            };
590            Ok(json!({}))
591        }
592        "com.atproto.repo.applyWrites" => {
593            let batch: com_atproto::repo::BatchWriteBody = rouille::input::json_input(request)?;
594            // TODO: validate edits against schemas
595            let did = Did::from_str(&batch.repo)?;
596            let mut srv = srv.lock().unwrap();
597            let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
598            let mut mutations: Vec<Mutation> = Default::default();
599            for w in batch.writes.iter() {
600                let m = match w.op_type.as_str() {
601                    "create" => Mutation::Create(
602                        Nsid::from_str(&w.collection)?,
603                        // TODO: user input unwrap here
604                        w.rkey
605                            .as_ref()
606                            .map(|t| Tid::from_str(t).unwrap())
607                            .unwrap_or_else(|| srv.tid_gen.next_tid()),
608                        json_value_into_ipld(w.value.clone().unwrap()),
609                    ),
610                    "update" => Mutation::Update(
611                        Nsid::from_str(&w.collection)?,
612                        Tid::from_str(w.rkey.as_ref().unwrap())?,
613                        json_value_into_ipld(w.value.clone().unwrap()),
614                    ),
615                    "delete" => Mutation::Delete(
616                        Nsid::from_str(&w.collection)?,
617                        Tid::from_str(w.rkey.as_ref().unwrap())?,
618                    ),
619                    _ => Err(anyhow!("unhandled operation type: {}", w.op_type))?,
620                };
621                mutations.push(m);
622            }
623            let keypair = srv.pds_keypair.clone();
624            srv.repo.mutate_repo(&did, &mutations, &keypair)?;
625            bsky_mutate_db(&mut srv.atp_db, &did, mutations)?;
626            Ok(json!({}))
627        }
628        "com.atproto.repo.createRecord" => {
629            // TODO: validate edits against schemas
630            let create: com_atproto::repo::CreateRecord = rouille::input::json_input(request)?;
631            let did = Did::from_str(&create.did)?;
632            let collection = Nsid::from_str(&create.collection)?;
633            let mut srv = srv.lock().unwrap();
634            let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
635            let mutations: Vec<Mutation> = vec![Mutation::Create(
636                collection,
637                srv.tid_gen.next_tid(),
638                json_value_into_ipld(create.record),
639            )];
640            let keypair = srv.pds_keypair.clone();
641            srv.repo.mutate_repo(&did, &mutations, &keypair)?;
642            bsky_mutate_db(&mut srv.atp_db, &did, mutations)?;
643            Ok(json!({}))
644        }
645        "com.atproto.repo.putRecord" => {
646            // TODO: validate edits against schemas
647            let put: com_atproto::repo::PutRecord = rouille::input::json_input(request)?;
648            let did = Did::from_str(&put.did)?;
649            let collection = Nsid::from_str(&put.collection)?;
650            let tid = Tid::from_str(&put.rkey)?;
651            let mut srv = srv.lock().unwrap();
652            let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
653
654            let mutations: Vec<Mutation> = vec![Mutation::Update(
655                collection,
656                tid,
657                json_value_into_ipld(put.record),
658            )];
659            let keypair = srv.pds_keypair.clone();
660            srv.repo.mutate_repo(&did, &mutations, &keypair)?;
661            bsky_mutate_db(&mut srv.atp_db, &did, mutations)?;
662            Ok(json!({}))
663        }
664        "com.atproto.repo.deleteRecord" => {
665            let delete: com_atproto::repo::DeleteRecord = rouille::input::json_input(request)?;
666            let did = Did::from_str(&delete.did)?;
667            let collection = Nsid::from_str(&delete.collection)?;
668            let tid = Tid::from_str(&delete.rkey)?;
669            let mut srv = srv.lock().unwrap();
670            let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
671
672            let mutations: Vec<Mutation> = vec![Mutation::Delete(collection, tid)];
673            let keypair = srv.pds_keypair.clone();
674            srv.repo.mutate_repo(&did, &mutations, &keypair)?;
675            bsky_mutate_db(&mut srv.atp_db, &did, mutations)?;
676            Ok(json!({}))
677        }
678        "com.atproto.sync.updateRepo" => {
679            // TODO: all other XRPC POST methods removed params (eg, 'did' in this case)
680            let did = Did::from_str(&xrpc_required_param(request, "repo")?)?;
681            // important that this read is before we take the mutex, because it could be slow!
682            let mut car_bytes: Vec<u8> = Default::default();
683            // TODO: unwrap()
684            request.data().unwrap().read_to_end(&mut car_bytes)?;
685            let mut srv = srv.lock().unwrap();
686            let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
687            srv.repo
688                .import_car_bytes(&car_bytes, Some(did.to_string()))?;
689            // TODO: need to update atp_db
690            Ok(json!({}))
691        }
692        // =========== app.bsky methods
693        "app.bsky.actor.updateProfile" => {
694            let profile: app_bsky::ProfileRecord = rouille::input::json_input(request)?;
695            let mut srv = srv.lock().unwrap();
696            let auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
697            bsky_update_profile(&mut srv, auth_did, profile)?;
698            Ok(json!({}))
699        }
700        "app.bsky.notification.updateSeen" => {
701            // TODO: actual implementation
702            let mut srv = srv.lock().unwrap();
703            let _auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
704            Ok(json!({}))
705        }
706        _ => Err(anyhow!(XrpcError::NotFound(format!(
707            "XRPC endpoint handler not found: {method}"
708        )))),
709    }
710}
711
712fn home_view_handler(srv: &Mutex<AtpService>, request: &Request) -> Result<String> {
713    let host = request.header("Host").unwrap_or("localhost");
714
715    // check if the hostname resolves to a DID (account)
716    let did: Option<Did> = {
717        // this mutex lock should drop at the end of this block
718        let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
719        srv.atp_db.resolve_handle(host)?
720    };
721    if did.is_some() {
722        account_view_handler(srv, host, request)
723    } else {
724        let view = GenericHomeView {
725            domain: host.to_string(),
726        };
727        Ok(view.render()?)
728    }
729}
730
731fn did_doc_view_handler(srv: &Mutex<AtpService>, request: &Request) -> Result<Response> {
732    let host = request.header("Host").unwrap_or("localhost");
733    let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
734    if let Some(did) = srv.atp_db.resolve_handle(host)? {
735        if did.to_string().starts_with("did:web:") {
736            let did_doc = srv.atp_db.get_did_doc(&did)?;
737            return Ok(Response::json(&did_doc));
738        }
739    };
740    Err(XrpcError::NotFound(
741        "no did:web: account registered at this domain".to_string(),
742    ))?
743}
744
745// TODO: did, collection, tid have already been parsed by this point
746fn account_view_handler(
747    srv: &Mutex<AtpService>,
748    handle: &str,
749    request: &Request,
750) -> Result<String> {
751    let host = request.header("Host").unwrap_or("localhost");
752    let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
753    // TODO: unwrap as 404
754    let did = srv
755        .atp_db
756        .resolve_handle(handle)?
757        .ok_or(XrpcError::NotFound(format!(
758            "no DID found for handle: {handle}"
759        )))?;
760
761    Ok(AccountView {
762        domain: host.to_string(),
763        did: did.clone(),
764        profile: bsky_get_profile_detailed(&mut srv, &did)?,
765        feed: bsky_get_author_feed(&mut srv, &did)?.feed,
766    }
767    .render()?)
768}
769
770fn thread_view_handler(
771    srv: &Mutex<AtpService>,
772    handle: &str,
773    tid: &Tid,
774    request: &Request,
775) -> Result<String> {
776    let host = request.header("Host").unwrap_or("localhost");
777    let collection = Nsid::from_str("app.bsky.feed.post")?;
778    let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
779    // TODO: not unwrap
780    let did = srv.atp_db.resolve_handle(handle)?.unwrap();
781
782    // TODO: could construct URI directly
783    let uri = AtUri::from_str(&format!("at://{did}/{collection}/{tid}"))?;
784    Ok(ThreadView {
785        domain: host.to_string(),
786        did,
787        collection,
788        tid: tid.clone(),
789        post: bsky_get_thread(&mut srv, &uri, None)?.thread,
790    }
791    .render()?)
792}
793
794fn repo_view_handler(srv: &Mutex<AtpService>, did: &str, request: &Request) -> Result<String> {
795    let host = request.header("Host").unwrap_or("localhost");
796    let did = Did::from_str(did)?;
797
798    let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
799    let did_doc = srv.atp_db.get_did_doc(&did)?;
800    let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
801    let commit = srv.repo.get_commit(commit_cid)?;
802    let collections: Vec<String> = srv.repo.collections(&did)?;
803    let desc = com_atproto::repo::Describe {
804        handle: did.to_string(), // TODO
805        did: did.to_string(),
806        didDoc: did_doc,
807        collections,
808        handleIsCorrect: true,
809    };
810
811    Ok(RepoView {
812        domain: host.to_string(),
813        did,
814        commit,
815        describe: desc,
816    }
817    .render()?)
818}
819
820fn collection_view_handler(
821    srv: &Mutex<AtpService>,
822    did: &str,
823    collection: &str,
824    request: &Request,
825) -> Result<String> {
826    let host = request.header("Host").unwrap_or("localhost");
827    let did = Did::from_str(did)?;
828    let collection = Nsid::from_str(collection)?;
829
830    let mut record_list: Vec<Value> = vec![];
831    let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
832    let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
833    let last_commit = srv.repo.get_commit(commit_cid)?;
834    let full_map = srv.repo.mst_to_map(&last_commit.mst_cid)?;
835    let prefix = format!("{collection}/");
836    for (mst_key, cid) in full_map.iter() {
837        debug!("{}", mst_key);
838        if mst_key.starts_with(&prefix) {
839            let record = srv.repo.get_ipld(cid)?;
840            record_list.push(json!({
841                "uri": format!("at://{did}{mst_key}"),
842                "tid": mst_key.split('/').nth(2).unwrap(),
843                "cid": cid,
844                "value": ipld_into_json_value(record),
845            }));
846        }
847    }
848
849    Ok(CollectionView {
850        domain: host.to_string(),
851        did,
852        collection,
853        records: record_list,
854    }
855    .render()?)
856}
857
858fn record_view_handler(
859    srv: &Mutex<AtpService>,
860    did: &str,
861    collection: &str,
862    tid: &str,
863    request: &Request,
864) -> Result<String> {
865    let host = request.header("Host").unwrap_or("localhost");
866    let did = Did::from_str(did)?;
867    let collection = Nsid::from_str(collection)?;
868    let rkey = Tid::from_str(tid)?;
869
870    let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
871    let key = format!("{collection}/{rkey}");
872    let record = match srv.repo.get_atp_record(&did, &collection, &rkey) {
873        Ok(Some(ipld)) => ipld_into_json_value(ipld),
874        Ok(None) => Err(anyhow!(XrpcError::NotFound(format!(
875            "could not find record: {key}"
876        ))))?,
877        Err(e) => Err(e)?,
878    };
879    Ok(RecordView {
880        domain: host.to_string(),
881        did,
882        collection,
883        tid: rkey,
884        record,
885    }
886    .render()?)
887}