Skip to main content

matrix_bot_sdk/
appservice.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::{Arc, Mutex as StdMutex};
3
4use anyhow::Context;
5use async_trait::async_trait;
6use percent_encoding::{NON_ALPHANUMERIC, utf8_percent_encode};
7use regex::Regex;
8use reqwest::Method;
9use salvo::http::StatusCode;
10use salvo::prelude::*;
11use serde::{Deserialize, Serialize};
12use serde_json::{Value, json};
13use tokio::sync::{Mutex as AsyncMutex, RwLock, broadcast};
14use url::{Url, form_urlencoded};
15use uuid::Uuid;
16
17use crate::client::{MatrixAuth, MatrixClient};
18use crate::models::CreateRoom;
19use crate::preprocessors::IPreprocessor;
20
21pub mod http_responses {
22    use serde::{Deserialize, Serialize};
23
24    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25    pub struct EmptyResponse {}
26
27    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
28    pub struct ErrorResponse {
29        pub error: String,
30        pub errcode: String,
31    }
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
35pub struct AppserviceTransaction {
36    pub transaction_id: String,
37    #[serde(default)]
38    pub events: Vec<Value>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
42pub struct PingHomeserverResponse {
43    pub duration_ms: f64,
44}
45
46#[async_trait]
47pub trait AppserviceHandler: Send + Sync {
48    async fn on_transaction(&self, _txn_id: &str, _body: &Value) -> anyhow::Result<()> {
49        Ok(())
50    }
51
52    async fn query_user(&self, _user_id: &str) -> anyhow::Result<Option<Value>> {
53        Ok(None)
54    }
55
56    async fn query_room_alias(&self, _room_alias: &str) -> anyhow::Result<Option<Value>> {
57        Ok(None)
58    }
59
60    async fn query_key_claim(&self, _body: &Value) -> anyhow::Result<Option<Value>> {
61        Ok(None)
62    }
63
64    async fn query_key(&self, _body: &Value) -> anyhow::Result<Option<Value>> {
65        Ok(None)
66    }
67
68    async fn thirdparty_protocol(&self, _protocol: &str) -> anyhow::Result<Option<Value>> {
69        Ok(None)
70    }
71
72    async fn thirdparty_user_remote(
73        &self,
74        _protocol: &str,
75        _fields: &HashMap<String, String>,
76    ) -> anyhow::Result<Vec<Value>> {
77        Ok(Vec::new())
78    }
79
80    async fn thirdparty_user_matrix(&self, _user_id: &str) -> anyhow::Result<Vec<Value>> {
81        Ok(Vec::new())
82    }
83
84    async fn thirdparty_location_remote(
85        &self,
86        _protocol: &str,
87        _fields: &HashMap<String, String>,
88    ) -> anyhow::Result<Vec<Value>> {
89        Ok(Vec::new())
90    }
91
92    async fn thirdparty_location_matrix(&self, _alias: &str) -> anyhow::Result<Vec<Value>> {
93        Ok(Vec::new())
94    }
95}
96
97#[derive(Debug, Default)]
98pub struct NoopAppserviceHandler;
99
100#[async_trait]
101impl AppserviceHandler for NoopAppserviceHandler {}
102
103#[derive(Clone)]
104pub struct Appservice {
105    pub homeserver_token: String,
106    pub appservice_token: String,
107    pub client: MatrixClient,
108    appservice_id: Option<String>,
109    homeserver_name: Option<String>,
110    user_prefix: Option<String>,
111    alias_prefix: Option<String>,
112    user_namespace_regexes: Vec<Regex>,
113    alias_namespace_regexes: Vec<Regex>,
114    protocols: Vec<String>,
115    handlers: Arc<dyn AppserviceHandler>,
116    preprocessors: Arc<RwLock<Vec<Arc<dyn IPreprocessor>>>>,
117    intents_cache: Arc<StdMutex<HashMap<String, Intent>>>,
118    pending_transactions: Arc<AsyncMutex<HashMap<String, broadcast::Sender<bool>>>>,
119    completed_transactions: Arc<RwLock<HashSet<String>>>,
120    ping_request: Arc<RwLock<Option<String>>>,
121}
122
123impl Default for Appservice {
124    fn default() -> Self {
125        let client = MatrixClient::new(
126            Url::parse("http://localhost").expect("localhost url must be valid"),
127            MatrixAuth::new(""),
128        );
129        Self::new("", "", client)
130    }
131}
132
133impl Appservice {
134    pub fn new(
135        homeserver_token: impl Into<String>,
136        appservice_token: impl Into<String>,
137        client: MatrixClient,
138    ) -> Self {
139        Self {
140            homeserver_token: homeserver_token.into(),
141            appservice_token: appservice_token.into(),
142            client,
143            appservice_id: None,
144            homeserver_name: None,
145            user_prefix: None,
146            alias_prefix: None,
147            user_namespace_regexes: Vec::new(),
148            alias_namespace_regexes: Vec::new(),
149            protocols: Vec::new(),
150            handlers: Arc::new(NoopAppserviceHandler),
151            preprocessors: Arc::new(RwLock::new(Vec::new())),
152            intents_cache: Arc::new(StdMutex::new(HashMap::new())),
153            pending_transactions: Arc::new(AsyncMutex::new(HashMap::new())),
154            completed_transactions: Arc::new(RwLock::new(HashSet::new())),
155            ping_request: Arc::new(RwLock::new(None)),
156        }
157    }
158
159    pub fn with_appservice_id(mut self, appservice_id: impl Into<String>) -> Self {
160        self.appservice_id = Some(appservice_id.into());
161        self
162    }
163
164    pub fn with_homeserver_name(mut self, homeserver_name: impl Into<String>) -> Self {
165        self.homeserver_name = Some(homeserver_name.into());
166        self
167    }
168
169    pub fn with_user_prefix(mut self, user_prefix: impl Into<String>) -> Self {
170        self.user_prefix = Some(user_prefix.into());
171        self
172    }
173
174    pub fn with_alias_prefix(mut self, alias_prefix: impl Into<String>) -> Self {
175        self.alias_prefix = Some(alias_prefix.into());
176        self
177    }
178
179    pub fn with_user_namespace_regexes<I, S>(mut self, regexes: I) -> Self
180    where
181        I: IntoIterator<Item = S>,
182        S: Into<String>,
183    {
184        self.user_namespace_regexes = regexes
185            .into_iter()
186            .filter_map(|regex| Regex::new(&regex.into()).ok())
187            .collect();
188        self
189    }
190
191    pub fn with_alias_namespace_regexes<I, S>(mut self, regexes: I) -> Self
192    where
193        I: IntoIterator<Item = S>,
194        S: Into<String>,
195    {
196        self.alias_namespace_regexes = regexes
197            .into_iter()
198            .filter_map(|regex| Regex::new(&regex.into()).ok())
199            .collect();
200        self
201    }
202
203    pub fn with_protocols<I, S>(mut self, protocols: I) -> Self
204    where
205        I: IntoIterator<Item = S>,
206        S: Into<String>,
207    {
208        self.protocols = protocols.into_iter().map(Into::into).collect();
209        self
210    }
211
212    pub fn with_handler(mut self, handler: Arc<dyn AppserviceHandler>) -> Self {
213        self.handlers = handler;
214        self
215    }
216
217    fn resolved_homeserver_name(&self) -> String {
218        if let Some(name) = &self.homeserver_name {
219            return name.clone();
220        }
221        let host = self.client.homeserver().host_str().unwrap_or("localhost");
222        if let Some(port) = self.client.homeserver().port() {
223            format!("{host}:{port}")
224        } else {
225            host.to_owned()
226        }
227    }
228
229    pub fn get_intent(&self, localpart: &str) -> Intent {
230        self.get_intent_for_user_id(&self.get_user_id(localpart))
231    }
232
233    pub fn get_user_id(&self, localpart: &str) -> String {
234        format!("@{localpart}:{}", self.resolved_homeserver_name())
235    }
236
237    pub fn get_intent_for_suffix(&self, suffix: &str) -> anyhow::Result<Intent> {
238        Ok(self.get_intent_for_user_id(&self.get_user_id_for_suffix(suffix)?))
239    }
240
241    pub fn get_user_id_for_suffix(&self, suffix: &str) -> anyhow::Result<String> {
242        let Some(prefix) = &self.user_prefix else {
243            anyhow::bail!("Cannot use get_user_id_for_suffix without a configured user prefix")
244        };
245        Ok(format!(
246            "{prefix}{suffix}:{}",
247            self.resolved_homeserver_name()
248        ))
249    }
250
251    pub fn get_intent_for_user_id(&self, user_id: &str) -> Intent {
252        if let Some(existing) = self
253            .intents_cache
254            .lock()
255            .expect("intent cache lock poisoned")
256            .get(user_id)
257            .cloned()
258        {
259            return existing;
260        }
261
262        let intent = Intent::new(user_id.to_owned(), self.client.clone());
263        self.intents_cache
264            .lock()
265            .expect("intent cache lock poisoned")
266            .insert(user_id.to_owned(), intent.clone());
267        intent
268    }
269
270    pub fn get_suffix_for_user_id(&self, user_id: &str) -> anyhow::Result<Option<String>> {
271        let Some(prefix) = &self.user_prefix else {
272            anyhow::bail!("Cannot use get_suffix_for_user_id without a configured user prefix")
273        };
274        Ok(self.suffix_for_entity(user_id, prefix))
275    }
276
277    pub fn is_namespaced_user(&self, user_id: &str) -> bool {
278        if self
279            .user_namespace_regexes
280            .iter()
281            .any(|regex| regex.is_match(user_id))
282        {
283            return true;
284        }
285        self.user_prefix
286            .as_ref()
287            .and_then(|prefix| self.suffix_for_entity(user_id, prefix))
288            .is_some()
289    }
290
291    pub fn get_alias(&self, localpart: &str) -> String {
292        format!("#{localpart}:{}", self.resolved_homeserver_name())
293    }
294
295    pub fn get_alias_for_suffix(&self, suffix: &str) -> anyhow::Result<String> {
296        let Some(prefix) = &self.alias_prefix else {
297            anyhow::bail!("Cannot use get_alias_for_suffix without a configured alias prefix")
298        };
299        Ok(format!(
300            "{prefix}{suffix}:{}",
301            self.resolved_homeserver_name()
302        ))
303    }
304
305    pub fn get_alias_localpart_for_suffix(&self, suffix: &str) -> anyhow::Result<String> {
306        let Some(prefix) = &self.alias_prefix else {
307            anyhow::bail!(
308                "Cannot use get_alias_localpart_for_suffix without a configured alias prefix"
309            )
310        };
311        Ok(format!("{}{suffix}", prefix.trim_start_matches('#')))
312    }
313
314    pub fn get_suffix_for_alias(&self, alias: &str) -> anyhow::Result<Option<String>> {
315        let Some(prefix) = &self.alias_prefix else {
316            anyhow::bail!("Cannot use get_suffix_for_alias without a configured alias prefix")
317        };
318        Ok(self.suffix_for_entity(alias, prefix))
319    }
320
321    pub fn is_namespaced_alias(&self, alias: &str) -> bool {
322        if self
323            .alias_namespace_regexes
324            .iter()
325            .any(|regex| regex.is_match(alias))
326        {
327            return true;
328        }
329        self.alias_prefix
330            .as_ref()
331            .and_then(|prefix| self.suffix_for_entity(alias, prefix))
332            .is_some()
333    }
334
335    pub async fn add_preprocessor(&self, preprocessor: Arc<dyn IPreprocessor>) {
336        self.preprocessors.write().await.push(preprocessor);
337    }
338
339    pub async fn set_room_directory_visibility(
340        &self,
341        network_id: &str,
342        room_id: &str,
343        visibility: &str,
344    ) -> anyhow::Result<Value> {
345        if visibility != "public" && visibility != "private" {
346            anyhow::bail!("visibility must be 'public' or 'private'");
347        }
348        let room_id = utf8_percent_encode(room_id, NON_ALPHANUMERIC).to_string();
349        let network_id = utf8_percent_encode(network_id, NON_ALPHANUMERIC).to_string();
350        self.client
351            .raw_json(
352                Method::PUT,
353                &format!("/_matrix/client/v3/directory/list/appservice/{network_id}/{room_id}"),
354                Some(json!({ "visibility": visibility })),
355            )
356            .await
357    }
358
359    pub fn router(&self) -> Router {
360        build_router_with_appservice(self.clone())
361    }
362
363    /// Present for API parity with the TypeScript SDK where the appservice starts
364    /// its own web server. In Rust, server startup is controlled by the host
365    /// application using `router()`.
366    pub async fn begin(&self) -> anyhow::Result<()> {
367        Ok(())
368    }
369
370    /// Present for API parity with the TypeScript SDK.
371    pub fn stop(&self) {}
372
373    pub async fn expect_ping_transaction(&self, transaction_id: impl Into<String>) {
374        *self.ping_request.write().await = Some(transaction_id.into());
375    }
376
377    pub async fn expected_ping_transaction(&self) -> Option<String> {
378        self.ping_request.read().await.clone()
379    }
380
381    pub async fn ping_homeserver(&self) -> anyhow::Result<PingHomeserverResponse> {
382        let appservice_id = self
383            .appservice_id
384            .as_ref()
385            .context("No `id` given in registration information. Cannot ping homeserver")?;
386        let transaction_id = format!("matrix-bot-sdk_{}", Uuid::new_v4());
387        self.expect_ping_transaction(transaction_id.clone()).await;
388        let endpoint = format!(
389            "/_matrix/client/v1/appservice/{}/ping",
390            utf8_percent_encode(appservice_id, NON_ALPHANUMERIC)
391        );
392        let response = self
393            .client
394            .raw_json(
395                Method::POST,
396                &endpoint,
397                Some(json!({ "transaction_id": transaction_id })),
398            )
399            .await?;
400        serde_json::from_value(response).context("invalid ping response payload")
401    }
402
403    async fn acquire_transaction_slot(&self, txn_id: &str) -> TransactionSlot {
404        if self.completed_transactions.read().await.contains(txn_id) {
405            return TransactionSlot::Completed;
406        }
407        let mut pending = self.pending_transactions.lock().await;
408        if let Some(sender) = pending.get(txn_id) {
409            return TransactionSlot::Pending(sender.subscribe());
410        }
411        let (sender, _receiver) = broadcast::channel(1);
412        pending.insert(txn_id.to_owned(), sender.clone());
413        TransactionSlot::Owner(sender)
414    }
415
416    async fn mark_transaction_completed(&self, txn_id: &str) {
417        self.completed_transactions
418            .write()
419            .await
420            .insert(txn_id.to_owned());
421    }
422
423    async fn clear_pending_transaction(&self, txn_id: &str) {
424        self.pending_transactions.lock().await.remove(txn_id);
425    }
426
427    async fn handle_transaction(&self, txn_id: &str, body: &Value) -> anyhow::Result<()> {
428        let mut processed = body.clone();
429        let preprocessors = self.preprocessors.read().await.clone();
430
431        if !preprocessors.is_empty()
432            && let Some(events) = processed.get_mut("events").and_then(Value::as_array_mut)
433        {
434            for event in events {
435                for preprocessor in &preprocessors {
436                    preprocessor.process(event).await?;
437                }
438            }
439        }
440
441        if !preprocessors.is_empty()
442            && let Some(events) = processed
443                .get_mut("de.sorunome.msc2409.ephemeral")
444                .and_then(Value::as_array_mut)
445        {
446            for event in events {
447                for preprocessor in &preprocessors {
448                    preprocessor.process(event).await?;
449                }
450            }
451        }
452
453        self.handlers.on_transaction(txn_id, &processed).await
454    }
455
456    fn supports_protocol(&self, protocol: &str) -> bool {
457        self.protocols.iter().any(|p| p == protocol)
458    }
459
460    fn suffix_for_entity(&self, value: &str, prefix: &str) -> Option<String> {
461        let homeserver = self.resolved_homeserver_name();
462        let suffix = format!(":{homeserver}");
463        if !value.starts_with(prefix) || !value.ends_with(&suffix) {
464            return None;
465        }
466        let middle = &value[prefix.len()..value.len().saturating_sub(suffix.len())];
467        if middle.is_empty() {
468            None
469        } else {
470            Some(middle.to_owned())
471        }
472    }
473}
474
475pub fn build_router() -> Router {
476    build_router_with_appservice(Appservice::default())
477}
478
479pub fn build_router_with_appservice(appservice: Appservice) -> Router {
480    Router::new()
481        .hoop(InjectAppservice::new(appservice))
482        .push(Router::with_path("health").get(health_handler))
483        .push(Router::with_path("users/{user_id}").get(user_handler))
484        .push(Router::with_path("rooms/{room_alias}").get(room_alias_handler))
485        .push(Router::with_path("transactions/{txn_id}").put(transaction_handler))
486        .push(Router::with_path("_matrix/app/v1/users/{user_id}").get(user_handler))
487        .push(Router::with_path("_matrix/app/v1/rooms/{room_alias}").get(room_alias_handler))
488        .push(Router::with_path("_matrix/app/v1/transactions/{txn_id}").put(transaction_handler))
489        .push(
490            Router::with_path("_matrix/app/v1/thirdparty/protocol/{protocol}")
491                .get(thirdparty_protocol_handler),
492        )
493        .push(
494            Router::with_path("_matrix/app/v1/thirdparty/user/{protocol}")
495                .get(thirdparty_user_handler),
496        )
497        .push(Router::with_path("_matrix/app/v1/thirdparty/user").get(thirdparty_user_handler))
498        .push(
499            Router::with_path("_matrix/app/v1/thirdparty/location/{protocol}")
500                .get(thirdparty_location_handler),
501        )
502        .push(
503            Router::with_path("_matrix/app/v1/thirdparty/location")
504                .get(thirdparty_location_handler),
505        )
506        .push(
507            Router::with_path("_matrix/app/unstable/org.matrix.msc3983/keys/claim")
508                .post(keys_claim_handler),
509        )
510        .push(
511            Router::with_path("_matrix/app/unstable/org.matrix.msc3984/keys/query")
512                .post(keys_query_handler),
513        )
514        .push(
515            Router::with_path("_matrix/app/v1/unstable/org.matrix.msc3983/keys/claim")
516                .post(keys_claim_handler),
517        )
518        .push(Router::with_path("unstable/org.matrix.msc3983/keys/claim").post(keys_claim_handler))
519        .push(
520            Router::with_path("_matrix/app/v1/unstable/org.matrix.msc3984/keys/query")
521                .post(keys_query_handler),
522        )
523        .push(Router::with_path("unstable/org.matrix.msc3984/keys/query").post(keys_query_handler))
524        .push(Router::with_path("_matrix/app/v1/ping").post(ping_handler))
525        .push(Router::with_path("{**rest}").goal(unrecognized_handler))
526}
527
528#[derive(Clone)]
529struct InjectAppservice {
530    appservice: Appservice,
531}
532
533impl InjectAppservice {
534    fn new(appservice: Appservice) -> Self {
535        Self { appservice }
536    }
537}
538
539#[handler]
540impl InjectAppservice {
541    async fn handle(
542        &self,
543        req: &mut Request,
544        depot: &mut Depot,
545        res: &mut Response,
546        ctrl: &mut FlowCtrl,
547    ) {
548        depot.inject(self.appservice.clone());
549        ctrl.call_next(req, depot, res).await;
550    }
551}
552
553enum TransactionSlot {
554    Completed,
555    Pending(broadcast::Receiver<bool>),
556    Owner(broadcast::Sender<bool>),
557}
558
559#[derive(Copy, Clone)]
560enum ThirdpartyObjectKind {
561    User,
562    Location,
563}
564
565fn appservice_from_depot(depot: &Depot) -> Option<Appservice> {
566    depot.obtain::<Appservice>().ok().cloned()
567}
568
569fn respond_error(res: &mut Response, status: StatusCode, errcode: &str, error: &str) {
570    res.status_code(status);
571    res.render(Json(json!({
572        "errcode": errcode,
573        "error": error,
574    })));
575}
576
577fn respond_ok_empty(res: &mut Response) {
578    res.status_code(StatusCode::OK);
579    res.render(Json(json!({})));
580}
581
582fn provided_access_token(req: &Request) -> Option<String> {
583    if let Some(auth_header) = req.header::<String>("authorization") {
584        return auth_header.strip_prefix("Bearer ").map(ToOwned::to_owned);
585    }
586    req.query::<String>("access_token")
587}
588
589fn is_authed(appservice: &Appservice, req: &Request) -> bool {
590    provided_access_token(req)
591        .as_deref()
592        .is_some_and(|token| token == appservice.homeserver_token)
593}
594
595fn require_auth(appservice: &Appservice, req: &Request, res: &mut Response) -> bool {
596    if is_authed(appservice, req) {
597        true
598    } else {
599        respond_error(
600            res,
601            StatusCode::UNAUTHORIZED,
602            "AUTH_FAILED",
603            "Authentication failed",
604        );
605        false
606    }
607}
608
609async fn parse_json_object(req: &mut Request, res: &mut Response) -> Option<Value> {
610    let body = match req.parse_json::<Value>().await {
611        Ok(value) => value,
612        Err(_) => {
613            respond_error(res, StatusCode::BAD_REQUEST, "BAD_REQUEST", "Expected JSON");
614            return None;
615        }
616    };
617    if !body.is_object() {
618        respond_error(res, StatusCode::BAD_REQUEST, "BAD_REQUEST", "Expected JSON");
619        return None;
620    }
621    Some(body)
622}
623
624fn query_fields_without_access_token(req: &Request) -> HashMap<String, String> {
625    let mut fields = HashMap::new();
626    let Some(raw_query) = req.uri().query() else {
627        return fields;
628    };
629    for (key, value) in form_urlencoded::parse(raw_query.as_bytes()) {
630        if key == "access_token" {
631            continue;
632        }
633        fields.insert(key.into_owned(), value.into_owned());
634    }
635    fields
636}
637
638fn room_alias_localpart(room_alias: &str) -> String {
639    room_alias
640        .strip_prefix('#')
641        .unwrap_or(room_alias)
642        .split(':')
643        .next()
644        .unwrap_or_default()
645        .to_owned()
646}
647
648#[handler]
649async fn health_handler(res: &mut Response) {
650    res.render(Json(json!({"ok": true})));
651}
652
653#[handler]
654async fn unrecognized_handler(res: &mut Response) {
655    respond_error(
656        res,
657        StatusCode::NOT_FOUND,
658        "M_UNRECOGNIZED",
659        "Endpoint not implemented",
660    );
661}
662
663#[handler]
664async fn transaction_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
665    let Some(appservice) = appservice_from_depot(depot) else {
666        respond_error(
667            res,
668            StatusCode::INTERNAL_SERVER_ERROR,
669            "M_UNKNOWN",
670            "Appservice state unavailable",
671        );
672        return;
673    };
674    if !require_auth(&appservice, req, res) {
675        return;
676    }
677
678    let Some(body) = parse_json_object(req, res).await else {
679        return;
680    };
681    if body.get("events").and_then(Value::as_array).is_none() {
682        respond_error(
683            res,
684            StatusCode::BAD_REQUEST,
685            "BAD_REQUEST",
686            "Invalid JSON: expected events",
687        );
688        return;
689    }
690
691    let txn_id = req.param::<String>("txn_id").unwrap_or_default();
692    match appservice.acquire_transaction_slot(&txn_id).await {
693        TransactionSlot::Completed => {
694            respond_ok_empty(res);
695        }
696        TransactionSlot::Pending(mut receiver) => match receiver.recv().await {
697            Ok(true) => respond_ok_empty(res),
698            _ => {
699                res.status_code(StatusCode::INTERNAL_SERVER_ERROR);
700                res.render(Json(json!({})));
701            }
702        },
703        TransactionSlot::Owner(sender) => {
704            let success = appservice.handle_transaction(&txn_id, &body).await.is_ok();
705            if success {
706                appservice.mark_transaction_completed(&txn_id).await;
707                respond_ok_empty(res);
708            } else {
709                res.status_code(StatusCode::INTERNAL_SERVER_ERROR);
710                res.render(Json(json!({})));
711            }
712            let _ = sender.send(success);
713            appservice.clear_pending_transaction(&txn_id).await;
714        }
715    }
716}
717
718#[handler]
719async fn user_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
720    let Some(appservice) = appservice_from_depot(depot) else {
721        respond_error(
722            res,
723            StatusCode::INTERNAL_SERVER_ERROR,
724            "M_UNKNOWN",
725            "Appservice state unavailable",
726        );
727        return;
728    };
729    if !require_auth(&appservice, req, res) {
730        return;
731    }
732
733    let user_id = req.param::<String>("user_id").unwrap_or_default();
734    match appservice.handlers.query_user(&user_id).await {
735        Ok(Some(result)) => {
736            res.status_code(StatusCode::OK);
737            res.render(Json(result));
738        }
739        Ok(None) => {
740            respond_error(
741                res,
742                StatusCode::NOT_FOUND,
743                "USER_DOES_NOT_EXIST",
744                "User not created",
745            );
746        }
747        Err(_) => {
748            respond_error(
749                res,
750                StatusCode::INTERNAL_SERVER_ERROR,
751                "M_UNKNOWN",
752                "Failed to query user",
753            );
754        }
755    }
756}
757
758#[handler]
759async fn room_alias_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
760    let Some(appservice) = appservice_from_depot(depot) else {
761        respond_error(
762            res,
763            StatusCode::INTERNAL_SERVER_ERROR,
764            "M_UNKNOWN",
765            "Appservice state unavailable",
766        );
767        return;
768    };
769    if !require_auth(&appservice, req, res) {
770        return;
771    }
772
773    let room_alias = req.param::<String>("room_alias").unwrap_or_default();
774    let result = match appservice.handlers.query_room_alias(&room_alias).await {
775        Ok(value) => value,
776        Err(_) => {
777            respond_error(
778                res,
779                StatusCode::INTERNAL_SERVER_ERROR,
780                "M_UNKNOWN",
781                "Failed to query room alias",
782            );
783            return;
784        }
785    };
786    let Some(value) = result else {
787        respond_error(
788            res,
789            StatusCode::NOT_FOUND,
790            "ROOM_DOES_NOT_EXIST",
791            "Room not created",
792        );
793        return;
794    };
795
796    let mut payload = match value {
797        Value::Object(map) => map,
798        _ => {
799            respond_error(
800                res,
801                StatusCode::INTERNAL_SERVER_ERROR,
802                "M_UNKNOWN",
803                "Room query handler must return an object",
804            );
805            return;
806        }
807    };
808    payload.insert(
809        "room_alias_name".to_owned(),
810        Value::String(room_alias_localpart(&room_alias)),
811    );
812    let create_room: CreateRoom = match serde_json::from_value(Value::Object(payload.clone())) {
813        Ok(options) => options,
814        Err(_) => {
815            respond_error(
816                res,
817                StatusCode::INTERNAL_SERVER_ERROR,
818                "M_UNKNOWN",
819                "Invalid room create options",
820            );
821            return;
822        }
823    };
824    match appservice.client.create_room(&create_room).await {
825        Ok(room_id) => {
826            payload.insert("__roomId".to_owned(), Value::String(room_id));
827            res.status_code(StatusCode::OK);
828            res.render(Json(Value::Object(payload)));
829        }
830        Err(_) => {
831            respond_error(
832                res,
833                StatusCode::INTERNAL_SERVER_ERROR,
834                "M_UNKNOWN",
835                "Failed to create room",
836            );
837        }
838    }
839}
840
841#[handler]
842async fn keys_claim_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
843    let Some(appservice) = appservice_from_depot(depot) else {
844        respond_error(
845            res,
846            StatusCode::INTERNAL_SERVER_ERROR,
847            "M_UNKNOWN",
848            "Appservice state unavailable",
849        );
850        return;
851    };
852    if !require_auth(&appservice, req, res) {
853        return;
854    }
855    let Some(body) = parse_json_object(req, res).await else {
856        return;
857    };
858    match appservice.handlers.query_key_claim(&body).await {
859        Ok(Some(result)) => {
860            res.status_code(StatusCode::OK);
861            res.render(Json(result));
862        }
863        Ok(None) => {
864            respond_error(
865                res,
866                StatusCode::NOT_FOUND,
867                "M_UNRECOGNIZED",
868                "Endpoint not implemented",
869            );
870        }
871        Err(_) => {
872            respond_error(
873                res,
874                StatusCode::INTERNAL_SERVER_ERROR,
875                "M_UNKNOWN",
876                "Error handling key claim API",
877            );
878        }
879    }
880}
881
882#[handler]
883async fn keys_query_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
884    let Some(appservice) = appservice_from_depot(depot) else {
885        respond_error(
886            res,
887            StatusCode::INTERNAL_SERVER_ERROR,
888            "M_UNKNOWN",
889            "Appservice state unavailable",
890        );
891        return;
892    };
893    if !require_auth(&appservice, req, res) {
894        return;
895    }
896    let Some(body) = parse_json_object(req, res).await else {
897        return;
898    };
899    match appservice.handlers.query_key(&body).await {
900        Ok(Some(result)) => {
901            res.status_code(StatusCode::OK);
902            res.render(Json(result));
903        }
904        Ok(None) => {
905            respond_error(
906                res,
907                StatusCode::NOT_FOUND,
908                "M_UNRECOGNIZED",
909                "Endpoint not implemented",
910            );
911        }
912        Err(_) => {
913            respond_error(
914                res,
915                StatusCode::INTERNAL_SERVER_ERROR,
916                "M_UNKNOWN",
917                "Error handling key query API",
918            );
919        }
920    }
921}
922
923#[handler]
924async fn thirdparty_protocol_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
925    let Some(appservice) = appservice_from_depot(depot) else {
926        respond_error(
927            res,
928            StatusCode::INTERNAL_SERVER_ERROR,
929            "M_UNKNOWN",
930            "Appservice state unavailable",
931        );
932        return;
933    };
934    if !require_auth(&appservice, req, res) {
935        return;
936    }
937
938    let protocol = req.param::<String>("protocol").unwrap_or_default();
939    if !appservice.supports_protocol(&protocol) {
940        respond_error(
941            res,
942            StatusCode::NOT_FOUND,
943            "PROTOCOL_NOT_HANDLED",
944            "Protocol is not handled by this appservice",
945        );
946        return;
947    }
948    match appservice.handlers.thirdparty_protocol(&protocol).await {
949        Ok(Some(result)) => {
950            res.status_code(StatusCode::OK);
951            res.render(Json(result));
952        }
953        Ok(None) => {
954            respond_error(
955                res,
956                StatusCode::NOT_FOUND,
957                "M_UNRECOGNIZED",
958                "Endpoint not implemented",
959            );
960        }
961        Err(_) => {
962            respond_error(
963                res,
964                StatusCode::INTERNAL_SERVER_ERROR,
965                "M_UNKNOWN",
966                "Failed to query third-party protocol",
967            );
968        }
969    }
970}
971
972async fn handle_thirdparty_object(
973    req: &Request,
974    res: &mut Response,
975    appservice: &Appservice,
976    kind: ThirdpartyObjectKind,
977) {
978    let protocol = req.param::<String>("protocol");
979    let matrix_id = match kind {
980        ThirdpartyObjectKind::User => req.query::<String>("userid"),
981        ThirdpartyObjectKind::Location => req.query::<String>("alias"),
982    };
983    let response_items = if let Some(protocol) = protocol {
984        if !appservice.supports_protocol(&protocol) {
985            respond_error(
986                res,
987                StatusCode::NOT_FOUND,
988                "PROTOCOL_NOT_HANDLED",
989                "Protocol is not handled by this appservice",
990            );
991            return;
992        }
993        let fields = query_fields_without_access_token(req);
994        match kind {
995            ThirdpartyObjectKind::User => {
996                appservice
997                    .handlers
998                    .thirdparty_user_remote(&protocol, &fields)
999                    .await
1000            }
1001            ThirdpartyObjectKind::Location => {
1002                appservice
1003                    .handlers
1004                    .thirdparty_location_remote(&protocol, &fields)
1005                    .await
1006            }
1007        }
1008    } else if let Some(matrix_id) = matrix_id {
1009        match kind {
1010            ThirdpartyObjectKind::User => {
1011                appservice.handlers.thirdparty_user_matrix(&matrix_id).await
1012            }
1013            ThirdpartyObjectKind::Location => {
1014                appservice
1015                    .handlers
1016                    .thirdparty_location_matrix(&matrix_id)
1017                    .await
1018            }
1019        }
1020    } else {
1021        respond_error(
1022            res,
1023            StatusCode::BAD_REQUEST,
1024            "INVALID_PARAMETERS",
1025            "Invalid parameters given",
1026        );
1027        return;
1028    };
1029
1030    match response_items {
1031        Ok(items) if items.is_empty() => {
1032            respond_error(
1033                res,
1034                StatusCode::NOT_FOUND,
1035                "NO_MAPPING_FOUND",
1036                "No mappings found",
1037            );
1038        }
1039        Ok(items) => {
1040            res.status_code(StatusCode::OK);
1041            res.render(Json(Value::Array(items)));
1042        }
1043        Err(_) => {
1044            respond_error(
1045                res,
1046                StatusCode::INTERNAL_SERVER_ERROR,
1047                "M_UNKNOWN",
1048                "Failed to process third-party lookup",
1049            );
1050        }
1051    }
1052}
1053
1054#[handler]
1055async fn thirdparty_user_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
1056    let Some(appservice) = appservice_from_depot(depot) else {
1057        respond_error(
1058            res,
1059            StatusCode::INTERNAL_SERVER_ERROR,
1060            "M_UNKNOWN",
1061            "Appservice state unavailable",
1062        );
1063        return;
1064    };
1065    if !require_auth(&appservice, req, res) {
1066        return;
1067    }
1068    handle_thirdparty_object(req, res, &appservice, ThirdpartyObjectKind::User).await;
1069}
1070
1071#[handler]
1072async fn thirdparty_location_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
1073    let Some(appservice) = appservice_from_depot(depot) else {
1074        respond_error(
1075            res,
1076            StatusCode::INTERNAL_SERVER_ERROR,
1077            "M_UNKNOWN",
1078            "Appservice state unavailable",
1079        );
1080        return;
1081    };
1082    if !require_auth(&appservice, req, res) {
1083        return;
1084    }
1085    handle_thirdparty_object(req, res, &appservice, ThirdpartyObjectKind::Location).await;
1086}
1087
1088#[handler]
1089async fn ping_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
1090    let Some(appservice) = appservice_from_depot(depot) else {
1091        respond_error(
1092            res,
1093            StatusCode::INTERNAL_SERVER_ERROR,
1094            "M_UNKNOWN",
1095            "Appservice state unavailable",
1096        );
1097        return;
1098    };
1099    if !require_auth(&appservice, req, res) {
1100        return;
1101    }
1102    let Some(body) = parse_json_object(req, res).await else {
1103        return;
1104    };
1105
1106    let Some(expected) = appservice.expected_ping_transaction().await else {
1107        respond_error(
1108            res,
1109            StatusCode::BAD_REQUEST,
1110            "BAD_REQUEST",
1111            "No ping request expected",
1112        );
1113        return;
1114    };
1115    let transaction_id = body
1116        .get("transaction_id")
1117        .and_then(Value::as_str)
1118        .unwrap_or_default();
1119    if transaction_id != expected {
1120        respond_error(
1121            res,
1122            StatusCode::BAD_REQUEST,
1123            "BAD_REQUEST",
1124            "transaction_id did not match",
1125        );
1126        return;
1127    }
1128    *appservice.ping_request.write().await = None;
1129    respond_ok_empty(res);
1130}
1131
1132#[derive(Clone)]
1133pub struct Intent {
1134    user_id: String,
1135    client: MatrixClient,
1136    storage: Option<Arc<dyn crate::storage::IAppserviceStorageProvider>>,
1137}
1138
1139impl Intent {
1140    pub fn new(user_id: impl Into<String>, client: MatrixClient) -> Self {
1141        Self {
1142            user_id: user_id.into(),
1143            client,
1144            storage: None,
1145        }
1146    }
1147
1148    pub fn with_storage(
1149        mut self,
1150        storage: Arc<dyn crate::storage::IAppserviceStorageProvider>,
1151    ) -> Self {
1152        self.storage = Some(storage);
1153        self
1154    }
1155
1156    pub fn user_id(&self) -> &str {
1157        &self.user_id
1158    }
1159
1160    pub fn underlying_client(&self) -> &MatrixClient {
1161        &self.client
1162    }
1163
1164    pub async fn ensure_registered(&self) -> anyhow::Result<()> {
1165        if let Some(storage) = &self.storage
1166            && storage.is_user_registered(&self.user_id).await?
1167        {
1168            return Ok(());
1169        }
1170
1171        let localpart = self
1172            .user_id
1173            .strip_prefix('@')
1174            .and_then(|s| s.split(':').next())
1175            .unwrap_or(&self.user_id);
1176
1177        let result = self
1178            .client
1179            .raw_json(
1180                Method::POST,
1181                "/_matrix/client/v3/register",
1182                Some(json!({
1183                    "type": "m.login.application_service",
1184                    "username": localpart,
1185                    "inhibit_login": true,
1186                })),
1187            )
1188            .await;
1189
1190        match result {
1191            Ok(_) => {
1192                if let Some(storage) = &self.storage {
1193                    storage.register_user(&self.user_id).await?;
1194                }
1195                Ok(())
1196            }
1197            Err(e) => {
1198                let err_str = e.to_string();
1199                if err_str.contains("M_USER_IN_USE") {
1200                    if let Some(storage) = &self.storage {
1201                        storage.register_user(&self.user_id).await?;
1202                    }
1203                    Ok(())
1204                } else {
1205                    Err(e)
1206                }
1207            }
1208        }
1209    }
1210
1211    pub async fn ensure_joined(&self, room_id_or_alias: &str) -> anyhow::Result<String> {
1212        self.ensure_registered().await?;
1213        self.client.join_room(room_id_or_alias).await
1214    }
1215
1216    pub async fn ensure_registered_and_joined(
1217        &self,
1218        room_id_or_alias: &str,
1219    ) -> anyhow::Result<String> {
1220        self.ensure_registered().await?;
1221        self.ensure_joined(room_id_or_alias).await
1222    }
1223
1224    pub async fn get_joined_rooms(&self) -> anyhow::Result<Vec<String>> {
1225        self.ensure_registered().await?;
1226        self.client.get_joined_rooms().await
1227    }
1228
1229    pub async fn refresh_joined_rooms(&self) -> anyhow::Result<Vec<String>> {
1230        self.get_joined_rooms().await
1231    }
1232
1233    pub async fn join_room(&self, room_id_or_alias: &str) -> anyhow::Result<String> {
1234        self.ensure_registered().await?;
1235        self.client.join_room(room_id_or_alias).await
1236    }
1237
1238    pub async fn leave_room(&self, room_id: &str, reason: Option<&str>) -> anyhow::Result<()> {
1239        self.ensure_registered().await?;
1240        self.client.leave_room(room_id, reason).await
1241    }
1242
1243    pub async fn send_text(&self, room_id: &str, body: &str) -> anyhow::Result<String> {
1244        self.ensure_registered_and_joined(room_id).await?;
1245        self.client.send_text(room_id, body).await
1246    }
1247
1248    pub async fn send_notice(&self, room_id: &str, body: &str) -> anyhow::Result<String> {
1249        self.ensure_registered_and_joined(room_id).await?;
1250        self.client.send_notice(room_id, body).await
1251    }
1252
1253    pub async fn send_event(
1254        &self,
1255        room_id: &str,
1256        event_type: &str,
1257        content: &Value,
1258    ) -> anyhow::Result<String> {
1259        self.ensure_registered_and_joined(room_id).await?;
1260        self.client.send_event(room_id, event_type, content).await
1261    }
1262
1263    pub async fn send_state_event(
1264        &self,
1265        room_id: &str,
1266        event_type: &str,
1267        state_key: &str,
1268        content: &Value,
1269    ) -> anyhow::Result<String> {
1270        self.ensure_registered_and_joined(room_id).await?;
1271        self.client
1272            .send_state_event(room_id, event_type, state_key, content)
1273            .await
1274    }
1275
1276    pub async fn send_message(&self, room_id: &str, body: &str) -> anyhow::Result<String> {
1277        self.send_text(room_id, body).await
1278    }
1279
1280    pub async fn invite_user(&self, user_id: &str, room_id: &str) -> anyhow::Result<()> {
1281        self.ensure_registered().await?;
1282        self.client.invite_user(user_id, room_id).await
1283    }
1284
1285    pub async fn kick_user(
1286        &self,
1287        user_id: &str,
1288        room_id: &str,
1289        reason: Option<&str>,
1290    ) -> anyhow::Result<()> {
1291        self.ensure_registered().await?;
1292        self.client.kick_user(user_id, room_id, reason).await
1293    }
1294
1295    pub async fn ban_user(
1296        &self,
1297        user_id: &str,
1298        room_id: &str,
1299        reason: Option<&str>,
1300    ) -> anyhow::Result<()> {
1301        self.ensure_registered().await?;
1302        self.client.ban_user(user_id, room_id, reason).await
1303    }
1304
1305    pub async fn unban_user(&self, user_id: &str, room_id: &str) -> anyhow::Result<()> {
1306        self.ensure_registered().await?;
1307        self.client.unban_user(user_id, room_id).await
1308    }
1309
1310    pub async fn redact_event(
1311        &self,
1312        room_id: &str,
1313        event_id: &str,
1314        reason: Option<&str>,
1315    ) -> anyhow::Result<String> {
1316        self.ensure_registered().await?;
1317        self.client.redact_event(room_id, event_id, reason).await
1318    }
1319
1320    pub async fn get_room_members(
1321        &self,
1322        room_id: &str,
1323    ) -> anyhow::Result<Vec<crate::models::events::MembershipEvent>> {
1324        self.ensure_registered().await?;
1325        self.client.get_room_members(room_id, None, None).await
1326    }
1327
1328    pub async fn enable_encryption(&self, room_id: &str) -> anyhow::Result<()> {
1329        self.ensure_registered_and_joined(room_id).await?;
1330        let content = json!({
1331            "algorithm": "m.megolm.v1.aes-sha2"
1332        });
1333        self.client
1334            .send_state_event(room_id, "m.room.encryption", "", &content)
1335            .await?;
1336        Ok(())
1337    }
1338
1339    pub fn unstable_apis(&self) -> UnstableIntentApis {
1340        UnstableIntentApis {
1341            intent: self.clone(),
1342        }
1343    }
1344}
1345
1346#[derive(Clone)]
1347pub struct UnstableIntentApis {
1348    intent: Intent,
1349}
1350
1351impl UnstableIntentApis {
1352    pub async fn knock_room(
1353        &self,
1354        room_id_or_alias: &str,
1355        reason: Option<&str>,
1356    ) -> anyhow::Result<String> {
1357        self.intent.ensure_registered().await?;
1358        let encoded = utf8_percent_encode(room_id_or_alias, NON_ALPHANUMERIC).to_string();
1359        let body = reason.map(|r| json!({ "reason": r })).unwrap_or_default();
1360        let response = self
1361            .intent
1362            .client
1363            .raw_json(
1364                Method::POST,
1365                &format!("/_matrix/client/unstable/knock/{encoded}"),
1366                Some(body),
1367            )
1368            .await?;
1369        response
1370            .get("room_id")
1371            .and_then(Value::as_str)
1372            .map(ToOwned::to_owned)
1373            .ok_or_else(|| anyhow::anyhow!("missing room_id in knock response"))
1374    }
1375
1376    pub async fn get_room_hierarchy(&self, room_id: &str) -> anyhow::Result<Value> {
1377        self.intent.ensure_registered().await?;
1378        let encoded = utf8_percent_encode(room_id, NON_ALPHANUMERIC).to_string();
1379        self.intent
1380            .client
1381            .raw_json(
1382                Method::GET,
1383                &format!("/_matrix/client/unstable/org.matrix.msc3266/rooms/{encoded}/hierarchy"),
1384                None,
1385            )
1386            .await
1387    }
1388}
1389
1390pub const REMOTE_USER_INFO_ACCOUNT_DATA_EVENT_TYPE: &str = "io.t2bot.sdk.bot.remote_user_info";
1391pub const REMOTE_ROOM_INFO_ACCOUNT_DATA_EVENT_TYPE: &str = "io.t2bot.sdk.bot.remote_room_info";
1392pub const REMOTE_USER_MAP_ACCOUNT_DATA_EVENT_TYPE_PREFIX: &str = "io.t2bot.sdk.bot.remote_user_map";
1393pub const REMOTE_ROOM_MAP_ACCOUNT_DATA_EVENT_TYPE_PREFIX: &str = "io.t2bot.sdk.bot.remote_room_map";
1394
1395#[derive(Clone)]
1396pub struct MatrixBridge {
1397    bot_intent: Intent,
1398    namespace_to_user: Arc<RwLock<HashMap<String, String>>>,
1399}
1400
1401impl MatrixBridge {
1402    pub fn new(bot_intent: Intent) -> Self {
1403        Self {
1404            bot_intent,
1405            namespace_to_user: Arc::new(RwLock::new(HashMap::new())),
1406        }
1407    }
1408
1409    pub async fn map_virtual_user(
1410        &self,
1411        namespace: impl Into<String>,
1412        user_id: impl Into<String>,
1413    ) -> anyhow::Result<()> {
1414        self.namespace_to_user
1415            .write()
1416            .await
1417            .insert(namespace.into(), user_id.into());
1418        Ok(())
1419    }
1420
1421    pub async fn resolve_virtual_user(&self, namespace: &str) -> Option<String> {
1422        self.namespace_to_user.read().await.get(namespace).cloned()
1423    }
1424
1425    /// Gets information about a remote user.
1426    pub async fn get_remote_user_info<T: serde::de::DeserializeOwned>(
1427        &self,
1428        user_intent: &Intent,
1429    ) -> anyhow::Result<T> {
1430        user_intent.ensure_registered().await?;
1431        let value = self
1432            .bot_intent
1433            .underlying_client()
1434            .get_account_data(REMOTE_USER_INFO_ACCOUNT_DATA_EVENT_TYPE)
1435            .await?;
1436        serde_json::from_value(value).context("Failed to parse remote user info")
1437    }
1438
1439    /// Sets information about a remote user. Calling this function will map the
1440    /// provided remote user ID to the intent's owner.
1441    pub async fn set_remote_user_info<T: serde::Serialize>(
1442        &self,
1443        user_intent: &Intent,
1444        remote_info: &T,
1445    ) -> anyhow::Result<()> {
1446        user_intent.ensure_registered().await?;
1447        let value = serde_json::to_value(remote_info)?;
1448        let id = value
1449            .get("id")
1450            .and_then(|v| v.as_str())
1451            .ok_or_else(|| anyhow::anyhow!("remote_info must have an 'id' field"))?;
1452        self.bot_intent
1453            .underlying_client()
1454            .set_account_data(REMOTE_USER_INFO_ACCOUNT_DATA_EVENT_TYPE, &value)
1455            .await?;
1456        self.update_remote_user_mapping(user_intent.user_id(), id)
1457            .await
1458    }
1459
1460    /// Gets information about a remote room.
1461    pub async fn get_remote_room_info<T: serde::de::DeserializeOwned>(
1462        &self,
1463        matrix_room_id: &str,
1464    ) -> anyhow::Result<T> {
1465        self.bot_intent.ensure_registered().await?;
1466        let value = self
1467            .bot_intent
1468            .underlying_client()
1469            .get_room_account_data(matrix_room_id, REMOTE_ROOM_INFO_ACCOUNT_DATA_EVENT_TYPE)
1470            .await?;
1471        serde_json::from_value(value).context("Failed to parse remote room info")
1472    }
1473
1474    /// Sets information about a remote room. Calling this function will map the
1475    /// provided remote room ID to the matrix room ID.
1476    pub async fn set_remote_room_info<T: serde::Serialize>(
1477        &self,
1478        matrix_room_id: &str,
1479        remote_info: &T,
1480    ) -> anyhow::Result<()> {
1481        self.bot_intent.ensure_registered().await?;
1482        let value = serde_json::to_value(remote_info)?;
1483        let id = value
1484            .get("id")
1485            .and_then(|v| v.as_str())
1486            .ok_or_else(|| anyhow::anyhow!("remote_info must have an 'id' field"))?;
1487        self.bot_intent
1488            .underlying_client()
1489            .set_room_account_data(
1490                matrix_room_id,
1491                REMOTE_ROOM_INFO_ACCOUNT_DATA_EVENT_TYPE,
1492                &value,
1493            )
1494            .await?;
1495        self.update_remote_room_mapping(matrix_room_id, id).await
1496    }
1497
1498    /// Gets the Matrix room ID for the provided remote room ID.
1499    pub async fn get_matrix_room_id_for_remote(
1500        &self,
1501        remote_room_id: &str,
1502    ) -> anyhow::Result<String> {
1503        self.bot_intent.ensure_registered().await?;
1504        let event_type =
1505            format!("{REMOTE_ROOM_MAP_ACCOUNT_DATA_EVENT_TYPE_PREFIX}.{remote_room_id}");
1506        let value = self
1507            .bot_intent
1508            .underlying_client()
1509            .get_account_data(&event_type)
1510            .await?;
1511        value
1512            .get("id")
1513            .and_then(|v| v.as_str())
1514            .map(ToOwned::to_owned)
1515            .ok_or_else(|| anyhow::anyhow!("Missing 'id' in remote room mapping"))
1516    }
1517
1518    /// Gets a Matrix user intent for the provided remote user ID.
1519    pub async fn get_intent_for_remote(&self, remote_user_id: &str) -> anyhow::Result<Intent> {
1520        self.bot_intent.ensure_registered().await?;
1521        let event_type =
1522            format!("{REMOTE_USER_MAP_ACCOUNT_DATA_EVENT_TYPE_PREFIX}.{remote_user_id}");
1523        let value = self
1524            .bot_intent
1525            .underlying_client()
1526            .get_account_data(&event_type)
1527            .await?;
1528        let user_id = value
1529            .get("id")
1530            .and_then(|v| v.as_str())
1531            .ok_or_else(|| anyhow::anyhow!("Missing 'id' in remote user mapping"))?;
1532        // Return a new Intent with the same client as bot_intent
1533        Ok(Intent::new(
1534            user_id,
1535            self.bot_intent.underlying_client().clone(),
1536        ))
1537    }
1538
1539    async fn update_remote_user_mapping(
1540        &self,
1541        matrix_user_id: &str,
1542        remote_user_id: &str,
1543    ) -> anyhow::Result<()> {
1544        let event_type =
1545            format!("{REMOTE_USER_MAP_ACCOUNT_DATA_EVENT_TYPE_PREFIX}.{remote_user_id}");
1546        self.bot_intent
1547            .underlying_client()
1548            .set_account_data(&event_type, &json!({ "id": matrix_user_id }))
1549            .await
1550    }
1551
1552    async fn update_remote_room_mapping(
1553        &self,
1554        matrix_room_id: &str,
1555        remote_room_id: &str,
1556    ) -> anyhow::Result<()> {
1557        let event_type =
1558            format!("{REMOTE_ROOM_MAP_ACCOUNT_DATA_EVENT_TYPE_PREFIX}.{remote_room_id}");
1559        self.bot_intent
1560            .underlying_client()
1561            .set_account_data(&event_type, &json!({ "id": matrix_room_id }))
1562            .await
1563    }
1564}
1565
1566#[async_trait]
1567pub trait UnstableAppserviceApis: Send + Sync {
1568    async fn unstable_ping(&self) -> anyhow::Result<Value>;
1569
1570    /// Send several historical events into a room (MSC2716).
1571    async fn send_historical_event_batch(
1572        &self,
1573        room_id: &str,
1574        prev_event_id: &str,
1575        events: Vec<Value>,
1576        state_events_at_start: Vec<Value>,
1577        chunk_id: Option<String>,
1578    ) -> anyhow::Result<crate::models::MSC2716BatchSendResponse>;
1579
1580    /// Sends an event to the given room with a given timestamp.
1581    async fn send_event_with_timestamp(
1582        &self,
1583        room_id: &str,
1584        event_type: &str,
1585        content: &Value,
1586        ts: u64,
1587    ) -> anyhow::Result<String>;
1588
1589    /// Sends a state event to the given room with a given timestamp.
1590    async fn send_state_event_with_timestamp(
1591        &self,
1592        room_id: &str,
1593        event_type: &str,
1594        state_key: &str,
1595        content: &Value,
1596        ts: u64,
1597    ) -> anyhow::Result<String>;
1598}
1599
1600#[async_trait]
1601impl UnstableAppserviceApis for Appservice {
1602    async fn unstable_ping(&self) -> anyhow::Result<Value> {
1603        self.client
1604            .raw_json(Method::GET, "/_matrix/app/unstable/ping", None)
1605            .await
1606    }
1607
1608    async fn send_historical_event_batch(
1609        &self,
1610        room_id: &str,
1611        prev_event_id: &str,
1612        events: Vec<Value>,
1613        state_events_at_start: Vec<Value>,
1614        chunk_id: Option<String>,
1615    ) -> anyhow::Result<crate::models::MSC2716BatchSendResponse> {
1616        let room_id = utf8_percent_encode(room_id, NON_ALPHANUMERIC).to_string();
1617        let body = json!({
1618            "prev_event": prev_event_id,
1619            "chunk_id": chunk_id,
1620            "events": events,
1621            "state_events_at_start": state_events_at_start,
1622        });
1623        let response = self
1624            .client
1625            .raw_json(
1626                Method::POST,
1627                &format!("/_matrix/client/unstable/org.matrix.msc2716/rooms/{room_id}/batch_send"),
1628                Some(body),
1629            )
1630            .await?;
1631        serde_json::from_value(response).context("Failed to parse batch send response")
1632    }
1633
1634    async fn send_event_with_timestamp(
1635        &self,
1636        room_id: &str,
1637        event_type: &str,
1638        content: &Value,
1639        ts: u64,
1640    ) -> anyhow::Result<String> {
1641        let room_id = utf8_percent_encode(room_id, NON_ALPHANUMERIC).to_string();
1642        let event_type = utf8_percent_encode(event_type, NON_ALPHANUMERIC).to_string();
1643        let txn_id = format!(
1644            "{}__inc_appts{}",
1645            chrono::Utc::now().timestamp_millis(),
1646            Uuid::new_v4()
1647        );
1648        let txn_id = utf8_percent_encode(&txn_id, NON_ALPHANUMERIC).to_string();
1649        let response = self
1650            .client
1651            .raw_json(
1652                Method::PUT,
1653                &format!(
1654                    "/_matrix/client/v3/rooms/{room_id}/send/{event_type}/{txn_id}?ts={}",
1655                    ts
1656                ),
1657                Some(content.clone()),
1658            )
1659            .await?;
1660        response
1661            .get("event_id")
1662            .and_then(|v| v.as_str())
1663            .map(ToOwned::to_owned)
1664            .ok_or_else(|| anyhow::anyhow!("Missing event_id in response"))
1665    }
1666
1667    async fn send_state_event_with_timestamp(
1668        &self,
1669        room_id: &str,
1670        event_type: &str,
1671        state_key: &str,
1672        content: &Value,
1673        ts: u64,
1674    ) -> anyhow::Result<String> {
1675        let room_id = utf8_percent_encode(room_id, NON_ALPHANUMERIC).to_string();
1676        let event_type = utf8_percent_encode(event_type, NON_ALPHANUMERIC).to_string();
1677        let state_key = utf8_percent_encode(state_key, NON_ALPHANUMERIC).to_string();
1678        let response = self
1679            .client
1680            .raw_json(
1681                Method::PUT,
1682                &format!(
1683                    "/_matrix/client/v3/rooms/{room_id}/state/{event_type}/{state_key}?ts={}",
1684                    ts
1685                ),
1686                Some(content.clone()),
1687            )
1688            .await?;
1689        response
1690            .get("event_id")
1691            .and_then(|v| v.as_str())
1692            .map(ToOwned::to_owned)
1693            .ok_or_else(|| anyhow::anyhow!("Missing event_id in response"))
1694    }
1695}