1use std::collections::{HashMap, HashSet};
2use std::sync::{Arc, Mutex as StdMutex};
3
4use anyhow::Context;
5use async_trait::async_trait;
6use percent_encoding::{NON_ALPHANUMERIC, utf8_percent_encode};
7use regex::Regex;
8use reqwest::Method;
9use salvo::http::StatusCode;
10use salvo::prelude::*;
11use serde::{Deserialize, Serialize};
12use serde_json::{Value, json};
13use tokio::sync::{Mutex as AsyncMutex, RwLock, broadcast};
14use url::{Url, form_urlencoded};
15use uuid::Uuid;
16
17use crate::client::{MatrixAuth, MatrixClient};
18use crate::models::CreateRoom;
19use crate::preprocessors::IPreprocessor;
20
21pub mod http_responses {
22 use serde::{Deserialize, Serialize};
23
24 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25 pub struct EmptyResponse {}
26
27 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
28 pub struct ErrorResponse {
29 pub error: String,
30 pub errcode: String,
31 }
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
35pub struct AppserviceTransaction {
36 pub transaction_id: String,
37 #[serde(default)]
38 pub events: Vec<Value>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
42pub struct PingHomeserverResponse {
43 pub duration_ms: f64,
44}
45
46#[async_trait]
47pub trait AppserviceHandler: Send + Sync {
48 async fn on_transaction(&self, _txn_id: &str, _body: &Value) -> anyhow::Result<()> {
49 Ok(())
50 }
51
52 async fn query_user(&self, _user_id: &str) -> anyhow::Result<Option<Value>> {
53 Ok(None)
54 }
55
56 async fn query_room_alias(&self, _room_alias: &str) -> anyhow::Result<Option<Value>> {
57 Ok(None)
58 }
59
60 async fn query_key_claim(&self, _body: &Value) -> anyhow::Result<Option<Value>> {
61 Ok(None)
62 }
63
64 async fn query_key(&self, _body: &Value) -> anyhow::Result<Option<Value>> {
65 Ok(None)
66 }
67
68 async fn thirdparty_protocol(&self, _protocol: &str) -> anyhow::Result<Option<Value>> {
69 Ok(None)
70 }
71
72 async fn thirdparty_user_remote(
73 &self,
74 _protocol: &str,
75 _fields: &HashMap<String, String>,
76 ) -> anyhow::Result<Vec<Value>> {
77 Ok(Vec::new())
78 }
79
80 async fn thirdparty_user_matrix(&self, _user_id: &str) -> anyhow::Result<Vec<Value>> {
81 Ok(Vec::new())
82 }
83
84 async fn thirdparty_location_remote(
85 &self,
86 _protocol: &str,
87 _fields: &HashMap<String, String>,
88 ) -> anyhow::Result<Vec<Value>> {
89 Ok(Vec::new())
90 }
91
92 async fn thirdparty_location_matrix(&self, _alias: &str) -> anyhow::Result<Vec<Value>> {
93 Ok(Vec::new())
94 }
95}
96
97#[derive(Debug, Default)]
98pub struct NoopAppserviceHandler;
99
100#[async_trait]
101impl AppserviceHandler for NoopAppserviceHandler {}
102
103#[derive(Clone)]
104pub struct Appservice {
105 pub homeserver_token: String,
106 pub appservice_token: String,
107 pub client: MatrixClient,
108 appservice_id: Option<String>,
109 homeserver_name: Option<String>,
110 user_prefix: Option<String>,
111 alias_prefix: Option<String>,
112 user_namespace_regexes: Vec<Regex>,
113 alias_namespace_regexes: Vec<Regex>,
114 protocols: Vec<String>,
115 handlers: Arc<dyn AppserviceHandler>,
116 preprocessors: Arc<RwLock<Vec<Arc<dyn IPreprocessor>>>>,
117 intents_cache: Arc<StdMutex<HashMap<String, Intent>>>,
118 pending_transactions: Arc<AsyncMutex<HashMap<String, broadcast::Sender<bool>>>>,
119 completed_transactions: Arc<RwLock<HashSet<String>>>,
120 ping_request: Arc<RwLock<Option<String>>>,
121}
122
123impl Default for Appservice {
124 fn default() -> Self {
125 let client = MatrixClient::new(
126 Url::parse("http://localhost").expect("localhost url must be valid"),
127 MatrixAuth::new(""),
128 );
129 Self::new("", "", client)
130 }
131}
132
133impl Appservice {
134 pub fn new(
135 homeserver_token: impl Into<String>,
136 appservice_token: impl Into<String>,
137 client: MatrixClient,
138 ) -> Self {
139 Self {
140 homeserver_token: homeserver_token.into(),
141 appservice_token: appservice_token.into(),
142 client,
143 appservice_id: None,
144 homeserver_name: None,
145 user_prefix: None,
146 alias_prefix: None,
147 user_namespace_regexes: Vec::new(),
148 alias_namespace_regexes: Vec::new(),
149 protocols: Vec::new(),
150 handlers: Arc::new(NoopAppserviceHandler),
151 preprocessors: Arc::new(RwLock::new(Vec::new())),
152 intents_cache: Arc::new(StdMutex::new(HashMap::new())),
153 pending_transactions: Arc::new(AsyncMutex::new(HashMap::new())),
154 completed_transactions: Arc::new(RwLock::new(HashSet::new())),
155 ping_request: Arc::new(RwLock::new(None)),
156 }
157 }
158
159 pub fn with_appservice_id(mut self, appservice_id: impl Into<String>) -> Self {
160 self.appservice_id = Some(appservice_id.into());
161 self
162 }
163
164 pub fn with_homeserver_name(mut self, homeserver_name: impl Into<String>) -> Self {
165 self.homeserver_name = Some(homeserver_name.into());
166 self
167 }
168
169 pub fn with_user_prefix(mut self, user_prefix: impl Into<String>) -> Self {
170 self.user_prefix = Some(user_prefix.into());
171 self
172 }
173
174 pub fn with_alias_prefix(mut self, alias_prefix: impl Into<String>) -> Self {
175 self.alias_prefix = Some(alias_prefix.into());
176 self
177 }
178
179 pub fn with_user_namespace_regexes<I, S>(mut self, regexes: I) -> Self
180 where
181 I: IntoIterator<Item = S>,
182 S: Into<String>,
183 {
184 self.user_namespace_regexes = regexes
185 .into_iter()
186 .filter_map(|regex| Regex::new(®ex.into()).ok())
187 .collect();
188 self
189 }
190
191 pub fn with_alias_namespace_regexes<I, S>(mut self, regexes: I) -> Self
192 where
193 I: IntoIterator<Item = S>,
194 S: Into<String>,
195 {
196 self.alias_namespace_regexes = regexes
197 .into_iter()
198 .filter_map(|regex| Regex::new(®ex.into()).ok())
199 .collect();
200 self
201 }
202
203 pub fn with_protocols<I, S>(mut self, protocols: I) -> Self
204 where
205 I: IntoIterator<Item = S>,
206 S: Into<String>,
207 {
208 self.protocols = protocols.into_iter().map(Into::into).collect();
209 self
210 }
211
212 pub fn with_handler(mut self, handler: Arc<dyn AppserviceHandler>) -> Self {
213 self.handlers = handler;
214 self
215 }
216
217 fn resolved_homeserver_name(&self) -> String {
218 if let Some(name) = &self.homeserver_name {
219 return name.clone();
220 }
221 let host = self.client.homeserver().host_str().unwrap_or("localhost");
222 if let Some(port) = self.client.homeserver().port() {
223 format!("{host}:{port}")
224 } else {
225 host.to_owned()
226 }
227 }
228
229 pub fn get_intent(&self, localpart: &str) -> Intent {
230 self.get_intent_for_user_id(&self.get_user_id(localpart))
231 }
232
233 pub fn get_user_id(&self, localpart: &str) -> String {
234 format!("@{localpart}:{}", self.resolved_homeserver_name())
235 }
236
237 pub fn get_intent_for_suffix(&self, suffix: &str) -> anyhow::Result<Intent> {
238 Ok(self.get_intent_for_user_id(&self.get_user_id_for_suffix(suffix)?))
239 }
240
241 pub fn get_user_id_for_suffix(&self, suffix: &str) -> anyhow::Result<String> {
242 let Some(prefix) = &self.user_prefix else {
243 anyhow::bail!("Cannot use get_user_id_for_suffix without a configured user prefix")
244 };
245 Ok(format!(
246 "{prefix}{suffix}:{}",
247 self.resolved_homeserver_name()
248 ))
249 }
250
251 pub fn get_intent_for_user_id(&self, user_id: &str) -> Intent {
252 if let Some(existing) = self
253 .intents_cache
254 .lock()
255 .expect("intent cache lock poisoned")
256 .get(user_id)
257 .cloned()
258 {
259 return existing;
260 }
261
262 let intent = Intent::new(user_id.to_owned(), self.client.clone());
263 self.intents_cache
264 .lock()
265 .expect("intent cache lock poisoned")
266 .insert(user_id.to_owned(), intent.clone());
267 intent
268 }
269
270 pub fn get_suffix_for_user_id(&self, user_id: &str) -> anyhow::Result<Option<String>> {
271 let Some(prefix) = &self.user_prefix else {
272 anyhow::bail!("Cannot use get_suffix_for_user_id without a configured user prefix")
273 };
274 Ok(self.suffix_for_entity(user_id, prefix))
275 }
276
277 pub fn is_namespaced_user(&self, user_id: &str) -> bool {
278 if self
279 .user_namespace_regexes
280 .iter()
281 .any(|regex| regex.is_match(user_id))
282 {
283 return true;
284 }
285 self.user_prefix
286 .as_ref()
287 .and_then(|prefix| self.suffix_for_entity(user_id, prefix))
288 .is_some()
289 }
290
291 pub fn get_alias(&self, localpart: &str) -> String {
292 format!("#{localpart}:{}", self.resolved_homeserver_name())
293 }
294
295 pub fn get_alias_for_suffix(&self, suffix: &str) -> anyhow::Result<String> {
296 let Some(prefix) = &self.alias_prefix else {
297 anyhow::bail!("Cannot use get_alias_for_suffix without a configured alias prefix")
298 };
299 Ok(format!(
300 "{prefix}{suffix}:{}",
301 self.resolved_homeserver_name()
302 ))
303 }
304
305 pub fn get_alias_localpart_for_suffix(&self, suffix: &str) -> anyhow::Result<String> {
306 let Some(prefix) = &self.alias_prefix else {
307 anyhow::bail!(
308 "Cannot use get_alias_localpart_for_suffix without a configured alias prefix"
309 )
310 };
311 Ok(format!("{}{suffix}", prefix.trim_start_matches('#')))
312 }
313
314 pub fn get_suffix_for_alias(&self, alias: &str) -> anyhow::Result<Option<String>> {
315 let Some(prefix) = &self.alias_prefix else {
316 anyhow::bail!("Cannot use get_suffix_for_alias without a configured alias prefix")
317 };
318 Ok(self.suffix_for_entity(alias, prefix))
319 }
320
321 pub fn is_namespaced_alias(&self, alias: &str) -> bool {
322 if self
323 .alias_namespace_regexes
324 .iter()
325 .any(|regex| regex.is_match(alias))
326 {
327 return true;
328 }
329 self.alias_prefix
330 .as_ref()
331 .and_then(|prefix| self.suffix_for_entity(alias, prefix))
332 .is_some()
333 }
334
335 pub async fn add_preprocessor(&self, preprocessor: Arc<dyn IPreprocessor>) {
336 self.preprocessors.write().await.push(preprocessor);
337 }
338
339 pub async fn set_room_directory_visibility(
340 &self,
341 network_id: &str,
342 room_id: &str,
343 visibility: &str,
344 ) -> anyhow::Result<Value> {
345 if visibility != "public" && visibility != "private" {
346 anyhow::bail!("visibility must be 'public' or 'private'");
347 }
348 let room_id = utf8_percent_encode(room_id, NON_ALPHANUMERIC).to_string();
349 let network_id = utf8_percent_encode(network_id, NON_ALPHANUMERIC).to_string();
350 self.client
351 .raw_json(
352 Method::PUT,
353 &format!("/_matrix/client/v3/directory/list/appservice/{network_id}/{room_id}"),
354 Some(json!({ "visibility": visibility })),
355 )
356 .await
357 }
358
359 pub fn router(&self) -> Router {
360 build_router_with_appservice(self.clone())
361 }
362
363 pub async fn begin(&self) -> anyhow::Result<()> {
367 Ok(())
368 }
369
370 pub fn stop(&self) {}
372
373 pub async fn expect_ping_transaction(&self, transaction_id: impl Into<String>) {
374 *self.ping_request.write().await = Some(transaction_id.into());
375 }
376
377 pub async fn expected_ping_transaction(&self) -> Option<String> {
378 self.ping_request.read().await.clone()
379 }
380
381 pub async fn ping_homeserver(&self) -> anyhow::Result<PingHomeserverResponse> {
382 let appservice_id = self
383 .appservice_id
384 .as_ref()
385 .context("No `id` given in registration information. Cannot ping homeserver")?;
386 let transaction_id = format!("matrix-bot-sdk_{}", Uuid::new_v4());
387 self.expect_ping_transaction(transaction_id.clone()).await;
388 let endpoint = format!(
389 "/_matrix/client/v1/appservice/{}/ping",
390 utf8_percent_encode(appservice_id, NON_ALPHANUMERIC)
391 );
392 let response = self
393 .client
394 .raw_json(
395 Method::POST,
396 &endpoint,
397 Some(json!({ "transaction_id": transaction_id })),
398 )
399 .await?;
400 serde_json::from_value(response).context("invalid ping response payload")
401 }
402
403 async fn acquire_transaction_slot(&self, txn_id: &str) -> TransactionSlot {
404 if self.completed_transactions.read().await.contains(txn_id) {
405 return TransactionSlot::Completed;
406 }
407 let mut pending = self.pending_transactions.lock().await;
408 if let Some(sender) = pending.get(txn_id) {
409 return TransactionSlot::Pending(sender.subscribe());
410 }
411 let (sender, _receiver) = broadcast::channel(1);
412 pending.insert(txn_id.to_owned(), sender.clone());
413 TransactionSlot::Owner(sender)
414 }
415
416 async fn mark_transaction_completed(&self, txn_id: &str) {
417 self.completed_transactions
418 .write()
419 .await
420 .insert(txn_id.to_owned());
421 }
422
423 async fn clear_pending_transaction(&self, txn_id: &str) {
424 self.pending_transactions.lock().await.remove(txn_id);
425 }
426
427 async fn handle_transaction(&self, txn_id: &str, body: &Value) -> anyhow::Result<()> {
428 let mut processed = body.clone();
429 let preprocessors = self.preprocessors.read().await.clone();
430
431 if !preprocessors.is_empty()
432 && let Some(events) = processed.get_mut("events").and_then(Value::as_array_mut)
433 {
434 for event in events {
435 for preprocessor in &preprocessors {
436 preprocessor.process(event).await?;
437 }
438 }
439 }
440
441 if !preprocessors.is_empty()
442 && let Some(events) = processed
443 .get_mut("de.sorunome.msc2409.ephemeral")
444 .and_then(Value::as_array_mut)
445 {
446 for event in events {
447 for preprocessor in &preprocessors {
448 preprocessor.process(event).await?;
449 }
450 }
451 }
452
453 self.handlers.on_transaction(txn_id, &processed).await
454 }
455
456 fn supports_protocol(&self, protocol: &str) -> bool {
457 self.protocols.iter().any(|p| p == protocol)
458 }
459
460 fn suffix_for_entity(&self, value: &str, prefix: &str) -> Option<String> {
461 let homeserver = self.resolved_homeserver_name();
462 let suffix = format!(":{homeserver}");
463 if !value.starts_with(prefix) || !value.ends_with(&suffix) {
464 return None;
465 }
466 let middle = &value[prefix.len()..value.len().saturating_sub(suffix.len())];
467 if middle.is_empty() {
468 None
469 } else {
470 Some(middle.to_owned())
471 }
472 }
473}
474
475pub fn build_router() -> Router {
476 build_router_with_appservice(Appservice::default())
477}
478
479pub fn build_router_with_appservice(appservice: Appservice) -> Router {
480 Router::new()
481 .hoop(InjectAppservice::new(appservice))
482 .push(Router::with_path("health").get(health_handler))
483 .push(Router::with_path("users/{user_id}").get(user_handler))
484 .push(Router::with_path("rooms/{room_alias}").get(room_alias_handler))
485 .push(Router::with_path("transactions/{txn_id}").put(transaction_handler))
486 .push(Router::with_path("_matrix/app/v1/users/{user_id}").get(user_handler))
487 .push(Router::with_path("_matrix/app/v1/rooms/{room_alias}").get(room_alias_handler))
488 .push(Router::with_path("_matrix/app/v1/transactions/{txn_id}").put(transaction_handler))
489 .push(
490 Router::with_path("_matrix/app/v1/thirdparty/protocol/{protocol}")
491 .get(thirdparty_protocol_handler),
492 )
493 .push(
494 Router::with_path("_matrix/app/v1/thirdparty/user/{protocol}")
495 .get(thirdparty_user_handler),
496 )
497 .push(Router::with_path("_matrix/app/v1/thirdparty/user").get(thirdparty_user_handler))
498 .push(
499 Router::with_path("_matrix/app/v1/thirdparty/location/{protocol}")
500 .get(thirdparty_location_handler),
501 )
502 .push(
503 Router::with_path("_matrix/app/v1/thirdparty/location")
504 .get(thirdparty_location_handler),
505 )
506 .push(
507 Router::with_path("_matrix/app/unstable/org.matrix.msc3983/keys/claim")
508 .post(keys_claim_handler),
509 )
510 .push(
511 Router::with_path("_matrix/app/unstable/org.matrix.msc3984/keys/query")
512 .post(keys_query_handler),
513 )
514 .push(
515 Router::with_path("_matrix/app/v1/unstable/org.matrix.msc3983/keys/claim")
516 .post(keys_claim_handler),
517 )
518 .push(Router::with_path("unstable/org.matrix.msc3983/keys/claim").post(keys_claim_handler))
519 .push(
520 Router::with_path("_matrix/app/v1/unstable/org.matrix.msc3984/keys/query")
521 .post(keys_query_handler),
522 )
523 .push(Router::with_path("unstable/org.matrix.msc3984/keys/query").post(keys_query_handler))
524 .push(Router::with_path("_matrix/app/v1/ping").post(ping_handler))
525 .push(Router::with_path("{**rest}").goal(unrecognized_handler))
526}
527
528#[derive(Clone)]
529struct InjectAppservice {
530 appservice: Appservice,
531}
532
533impl InjectAppservice {
534 fn new(appservice: Appservice) -> Self {
535 Self { appservice }
536 }
537}
538
539#[handler]
540impl InjectAppservice {
541 async fn handle(
542 &self,
543 req: &mut Request,
544 depot: &mut Depot,
545 res: &mut Response,
546 ctrl: &mut FlowCtrl,
547 ) {
548 depot.inject(self.appservice.clone());
549 ctrl.call_next(req, depot, res).await;
550 }
551}
552
553enum TransactionSlot {
554 Completed,
555 Pending(broadcast::Receiver<bool>),
556 Owner(broadcast::Sender<bool>),
557}
558
559#[derive(Copy, Clone)]
560enum ThirdpartyObjectKind {
561 User,
562 Location,
563}
564
565fn appservice_from_depot(depot: &Depot) -> Option<Appservice> {
566 depot.obtain::<Appservice>().ok().cloned()
567}
568
569fn respond_error(res: &mut Response, status: StatusCode, errcode: &str, error: &str) {
570 res.status_code(status);
571 res.render(Json(json!({
572 "errcode": errcode,
573 "error": error,
574 })));
575}
576
577fn respond_ok_empty(res: &mut Response) {
578 res.status_code(StatusCode::OK);
579 res.render(Json(json!({})));
580}
581
582fn provided_access_token(req: &Request) -> Option<String> {
583 if let Some(auth_header) = req.header::<String>("authorization") {
584 return auth_header.strip_prefix("Bearer ").map(ToOwned::to_owned);
585 }
586 req.query::<String>("access_token")
587}
588
589fn is_authed(appservice: &Appservice, req: &Request) -> bool {
590 provided_access_token(req)
591 .as_deref()
592 .is_some_and(|token| token == appservice.homeserver_token)
593}
594
595fn require_auth(appservice: &Appservice, req: &Request, res: &mut Response) -> bool {
596 if is_authed(appservice, req) {
597 true
598 } else {
599 respond_error(
600 res,
601 StatusCode::UNAUTHORIZED,
602 "AUTH_FAILED",
603 "Authentication failed",
604 );
605 false
606 }
607}
608
609async fn parse_json_object(req: &mut Request, res: &mut Response) -> Option<Value> {
610 let body = match req.parse_json::<Value>().await {
611 Ok(value) => value,
612 Err(_) => {
613 respond_error(res, StatusCode::BAD_REQUEST, "BAD_REQUEST", "Expected JSON");
614 return None;
615 }
616 };
617 if !body.is_object() {
618 respond_error(res, StatusCode::BAD_REQUEST, "BAD_REQUEST", "Expected JSON");
619 return None;
620 }
621 Some(body)
622}
623
624fn query_fields_without_access_token(req: &Request) -> HashMap<String, String> {
625 let mut fields = HashMap::new();
626 let Some(raw_query) = req.uri().query() else {
627 return fields;
628 };
629 for (key, value) in form_urlencoded::parse(raw_query.as_bytes()) {
630 if key == "access_token" {
631 continue;
632 }
633 fields.insert(key.into_owned(), value.into_owned());
634 }
635 fields
636}
637
638fn room_alias_localpart(room_alias: &str) -> String {
639 room_alias
640 .strip_prefix('#')
641 .unwrap_or(room_alias)
642 .split(':')
643 .next()
644 .unwrap_or_default()
645 .to_owned()
646}
647
648#[handler]
649async fn health_handler(res: &mut Response) {
650 res.render(Json(json!({"ok": true})));
651}
652
653#[handler]
654async fn unrecognized_handler(res: &mut Response) {
655 respond_error(
656 res,
657 StatusCode::NOT_FOUND,
658 "M_UNRECOGNIZED",
659 "Endpoint not implemented",
660 );
661}
662
663#[handler]
664async fn transaction_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
665 let Some(appservice) = appservice_from_depot(depot) else {
666 respond_error(
667 res,
668 StatusCode::INTERNAL_SERVER_ERROR,
669 "M_UNKNOWN",
670 "Appservice state unavailable",
671 );
672 return;
673 };
674 if !require_auth(&appservice, req, res) {
675 return;
676 }
677
678 let Some(body) = parse_json_object(req, res).await else {
679 return;
680 };
681 if body.get("events").and_then(Value::as_array).is_none() {
682 respond_error(
683 res,
684 StatusCode::BAD_REQUEST,
685 "BAD_REQUEST",
686 "Invalid JSON: expected events",
687 );
688 return;
689 }
690
691 let txn_id = req.param::<String>("txn_id").unwrap_or_default();
692 match appservice.acquire_transaction_slot(&txn_id).await {
693 TransactionSlot::Completed => {
694 respond_ok_empty(res);
695 }
696 TransactionSlot::Pending(mut receiver) => match receiver.recv().await {
697 Ok(true) => respond_ok_empty(res),
698 _ => {
699 res.status_code(StatusCode::INTERNAL_SERVER_ERROR);
700 res.render(Json(json!({})));
701 }
702 },
703 TransactionSlot::Owner(sender) => {
704 let success = appservice.handle_transaction(&txn_id, &body).await.is_ok();
705 if success {
706 appservice.mark_transaction_completed(&txn_id).await;
707 respond_ok_empty(res);
708 } else {
709 res.status_code(StatusCode::INTERNAL_SERVER_ERROR);
710 res.render(Json(json!({})));
711 }
712 let _ = sender.send(success);
713 appservice.clear_pending_transaction(&txn_id).await;
714 }
715 }
716}
717
718#[handler]
719async fn user_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
720 let Some(appservice) = appservice_from_depot(depot) else {
721 respond_error(
722 res,
723 StatusCode::INTERNAL_SERVER_ERROR,
724 "M_UNKNOWN",
725 "Appservice state unavailable",
726 );
727 return;
728 };
729 if !require_auth(&appservice, req, res) {
730 return;
731 }
732
733 let user_id = req.param::<String>("user_id").unwrap_or_default();
734 match appservice.handlers.query_user(&user_id).await {
735 Ok(Some(result)) => {
736 res.status_code(StatusCode::OK);
737 res.render(Json(result));
738 }
739 Ok(None) => {
740 respond_error(
741 res,
742 StatusCode::NOT_FOUND,
743 "USER_DOES_NOT_EXIST",
744 "User not created",
745 );
746 }
747 Err(_) => {
748 respond_error(
749 res,
750 StatusCode::INTERNAL_SERVER_ERROR,
751 "M_UNKNOWN",
752 "Failed to query user",
753 );
754 }
755 }
756}
757
758#[handler]
759async fn room_alias_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
760 let Some(appservice) = appservice_from_depot(depot) else {
761 respond_error(
762 res,
763 StatusCode::INTERNAL_SERVER_ERROR,
764 "M_UNKNOWN",
765 "Appservice state unavailable",
766 );
767 return;
768 };
769 if !require_auth(&appservice, req, res) {
770 return;
771 }
772
773 let room_alias = req.param::<String>("room_alias").unwrap_or_default();
774 let result = match appservice.handlers.query_room_alias(&room_alias).await {
775 Ok(value) => value,
776 Err(_) => {
777 respond_error(
778 res,
779 StatusCode::INTERNAL_SERVER_ERROR,
780 "M_UNKNOWN",
781 "Failed to query room alias",
782 );
783 return;
784 }
785 };
786 let Some(value) = result else {
787 respond_error(
788 res,
789 StatusCode::NOT_FOUND,
790 "ROOM_DOES_NOT_EXIST",
791 "Room not created",
792 );
793 return;
794 };
795
796 let mut payload = match value {
797 Value::Object(map) => map,
798 _ => {
799 respond_error(
800 res,
801 StatusCode::INTERNAL_SERVER_ERROR,
802 "M_UNKNOWN",
803 "Room query handler must return an object",
804 );
805 return;
806 }
807 };
808 payload.insert(
809 "room_alias_name".to_owned(),
810 Value::String(room_alias_localpart(&room_alias)),
811 );
812 let create_room: CreateRoom = match serde_json::from_value(Value::Object(payload.clone())) {
813 Ok(options) => options,
814 Err(_) => {
815 respond_error(
816 res,
817 StatusCode::INTERNAL_SERVER_ERROR,
818 "M_UNKNOWN",
819 "Invalid room create options",
820 );
821 return;
822 }
823 };
824 match appservice.client.create_room(&create_room).await {
825 Ok(room_id) => {
826 payload.insert("__roomId".to_owned(), Value::String(room_id));
827 res.status_code(StatusCode::OK);
828 res.render(Json(Value::Object(payload)));
829 }
830 Err(_) => {
831 respond_error(
832 res,
833 StatusCode::INTERNAL_SERVER_ERROR,
834 "M_UNKNOWN",
835 "Failed to create room",
836 );
837 }
838 }
839}
840
841#[handler]
842async fn keys_claim_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
843 let Some(appservice) = appservice_from_depot(depot) else {
844 respond_error(
845 res,
846 StatusCode::INTERNAL_SERVER_ERROR,
847 "M_UNKNOWN",
848 "Appservice state unavailable",
849 );
850 return;
851 };
852 if !require_auth(&appservice, req, res) {
853 return;
854 }
855 let Some(body) = parse_json_object(req, res).await else {
856 return;
857 };
858 match appservice.handlers.query_key_claim(&body).await {
859 Ok(Some(result)) => {
860 res.status_code(StatusCode::OK);
861 res.render(Json(result));
862 }
863 Ok(None) => {
864 respond_error(
865 res,
866 StatusCode::NOT_FOUND,
867 "M_UNRECOGNIZED",
868 "Endpoint not implemented",
869 );
870 }
871 Err(_) => {
872 respond_error(
873 res,
874 StatusCode::INTERNAL_SERVER_ERROR,
875 "M_UNKNOWN",
876 "Error handling key claim API",
877 );
878 }
879 }
880}
881
882#[handler]
883async fn keys_query_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
884 let Some(appservice) = appservice_from_depot(depot) else {
885 respond_error(
886 res,
887 StatusCode::INTERNAL_SERVER_ERROR,
888 "M_UNKNOWN",
889 "Appservice state unavailable",
890 );
891 return;
892 };
893 if !require_auth(&appservice, req, res) {
894 return;
895 }
896 let Some(body) = parse_json_object(req, res).await else {
897 return;
898 };
899 match appservice.handlers.query_key(&body).await {
900 Ok(Some(result)) => {
901 res.status_code(StatusCode::OK);
902 res.render(Json(result));
903 }
904 Ok(None) => {
905 respond_error(
906 res,
907 StatusCode::NOT_FOUND,
908 "M_UNRECOGNIZED",
909 "Endpoint not implemented",
910 );
911 }
912 Err(_) => {
913 respond_error(
914 res,
915 StatusCode::INTERNAL_SERVER_ERROR,
916 "M_UNKNOWN",
917 "Error handling key query API",
918 );
919 }
920 }
921}
922
923#[handler]
924async fn thirdparty_protocol_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
925 let Some(appservice) = appservice_from_depot(depot) else {
926 respond_error(
927 res,
928 StatusCode::INTERNAL_SERVER_ERROR,
929 "M_UNKNOWN",
930 "Appservice state unavailable",
931 );
932 return;
933 };
934 if !require_auth(&appservice, req, res) {
935 return;
936 }
937
938 let protocol = req.param::<String>("protocol").unwrap_or_default();
939 if !appservice.supports_protocol(&protocol) {
940 respond_error(
941 res,
942 StatusCode::NOT_FOUND,
943 "PROTOCOL_NOT_HANDLED",
944 "Protocol is not handled by this appservice",
945 );
946 return;
947 }
948 match appservice.handlers.thirdparty_protocol(&protocol).await {
949 Ok(Some(result)) => {
950 res.status_code(StatusCode::OK);
951 res.render(Json(result));
952 }
953 Ok(None) => {
954 respond_error(
955 res,
956 StatusCode::NOT_FOUND,
957 "M_UNRECOGNIZED",
958 "Endpoint not implemented",
959 );
960 }
961 Err(_) => {
962 respond_error(
963 res,
964 StatusCode::INTERNAL_SERVER_ERROR,
965 "M_UNKNOWN",
966 "Failed to query third-party protocol",
967 );
968 }
969 }
970}
971
972async fn handle_thirdparty_object(
973 req: &Request,
974 res: &mut Response,
975 appservice: &Appservice,
976 kind: ThirdpartyObjectKind,
977) {
978 let protocol = req.param::<String>("protocol");
979 let matrix_id = match kind {
980 ThirdpartyObjectKind::User => req.query::<String>("userid"),
981 ThirdpartyObjectKind::Location => req.query::<String>("alias"),
982 };
983 let response_items = if let Some(protocol) = protocol {
984 if !appservice.supports_protocol(&protocol) {
985 respond_error(
986 res,
987 StatusCode::NOT_FOUND,
988 "PROTOCOL_NOT_HANDLED",
989 "Protocol is not handled by this appservice",
990 );
991 return;
992 }
993 let fields = query_fields_without_access_token(req);
994 match kind {
995 ThirdpartyObjectKind::User => {
996 appservice
997 .handlers
998 .thirdparty_user_remote(&protocol, &fields)
999 .await
1000 }
1001 ThirdpartyObjectKind::Location => {
1002 appservice
1003 .handlers
1004 .thirdparty_location_remote(&protocol, &fields)
1005 .await
1006 }
1007 }
1008 } else if let Some(matrix_id) = matrix_id {
1009 match kind {
1010 ThirdpartyObjectKind::User => {
1011 appservice.handlers.thirdparty_user_matrix(&matrix_id).await
1012 }
1013 ThirdpartyObjectKind::Location => {
1014 appservice
1015 .handlers
1016 .thirdparty_location_matrix(&matrix_id)
1017 .await
1018 }
1019 }
1020 } else {
1021 respond_error(
1022 res,
1023 StatusCode::BAD_REQUEST,
1024 "INVALID_PARAMETERS",
1025 "Invalid parameters given",
1026 );
1027 return;
1028 };
1029
1030 match response_items {
1031 Ok(items) if items.is_empty() => {
1032 respond_error(
1033 res,
1034 StatusCode::NOT_FOUND,
1035 "NO_MAPPING_FOUND",
1036 "No mappings found",
1037 );
1038 }
1039 Ok(items) => {
1040 res.status_code(StatusCode::OK);
1041 res.render(Json(Value::Array(items)));
1042 }
1043 Err(_) => {
1044 respond_error(
1045 res,
1046 StatusCode::INTERNAL_SERVER_ERROR,
1047 "M_UNKNOWN",
1048 "Failed to process third-party lookup",
1049 );
1050 }
1051 }
1052}
1053
1054#[handler]
1055async fn thirdparty_user_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
1056 let Some(appservice) = appservice_from_depot(depot) else {
1057 respond_error(
1058 res,
1059 StatusCode::INTERNAL_SERVER_ERROR,
1060 "M_UNKNOWN",
1061 "Appservice state unavailable",
1062 );
1063 return;
1064 };
1065 if !require_auth(&appservice, req, res) {
1066 return;
1067 }
1068 handle_thirdparty_object(req, res, &appservice, ThirdpartyObjectKind::User).await;
1069}
1070
1071#[handler]
1072async fn thirdparty_location_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
1073 let Some(appservice) = appservice_from_depot(depot) else {
1074 respond_error(
1075 res,
1076 StatusCode::INTERNAL_SERVER_ERROR,
1077 "M_UNKNOWN",
1078 "Appservice state unavailable",
1079 );
1080 return;
1081 };
1082 if !require_auth(&appservice, req, res) {
1083 return;
1084 }
1085 handle_thirdparty_object(req, res, &appservice, ThirdpartyObjectKind::Location).await;
1086}
1087
1088#[handler]
1089async fn ping_handler(req: &mut Request, depot: &mut Depot, res: &mut Response) {
1090 let Some(appservice) = appservice_from_depot(depot) else {
1091 respond_error(
1092 res,
1093 StatusCode::INTERNAL_SERVER_ERROR,
1094 "M_UNKNOWN",
1095 "Appservice state unavailable",
1096 );
1097 return;
1098 };
1099 if !require_auth(&appservice, req, res) {
1100 return;
1101 }
1102 let Some(body) = parse_json_object(req, res).await else {
1103 return;
1104 };
1105
1106 let Some(expected) = appservice.expected_ping_transaction().await else {
1107 respond_error(
1108 res,
1109 StatusCode::BAD_REQUEST,
1110 "BAD_REQUEST",
1111 "No ping request expected",
1112 );
1113 return;
1114 };
1115 let transaction_id = body
1116 .get("transaction_id")
1117 .and_then(Value::as_str)
1118 .unwrap_or_default();
1119 if transaction_id != expected {
1120 respond_error(
1121 res,
1122 StatusCode::BAD_REQUEST,
1123 "BAD_REQUEST",
1124 "transaction_id did not match",
1125 );
1126 return;
1127 }
1128 *appservice.ping_request.write().await = None;
1129 respond_ok_empty(res);
1130}
1131
1132#[derive(Clone)]
1133pub struct Intent {
1134 user_id: String,
1135 client: MatrixClient,
1136 storage: Option<Arc<dyn crate::storage::IAppserviceStorageProvider>>,
1137}
1138
1139impl Intent {
1140 pub fn new(user_id: impl Into<String>, client: MatrixClient) -> Self {
1141 Self {
1142 user_id: user_id.into(),
1143 client,
1144 storage: None,
1145 }
1146 }
1147
1148 pub fn with_storage(
1149 mut self,
1150 storage: Arc<dyn crate::storage::IAppserviceStorageProvider>,
1151 ) -> Self {
1152 self.storage = Some(storage);
1153 self
1154 }
1155
1156 pub fn user_id(&self) -> &str {
1157 &self.user_id
1158 }
1159
1160 pub fn underlying_client(&self) -> &MatrixClient {
1161 &self.client
1162 }
1163
1164 pub async fn ensure_registered(&self) -> anyhow::Result<()> {
1165 if let Some(storage) = &self.storage
1166 && storage.is_user_registered(&self.user_id).await?
1167 {
1168 return Ok(());
1169 }
1170
1171 let localpart = self
1172 .user_id
1173 .strip_prefix('@')
1174 .and_then(|s| s.split(':').next())
1175 .unwrap_or(&self.user_id);
1176
1177 let result = self
1178 .client
1179 .raw_json(
1180 Method::POST,
1181 "/_matrix/client/v3/register",
1182 Some(json!({
1183 "type": "m.login.application_service",
1184 "username": localpart,
1185 "inhibit_login": true,
1186 })),
1187 )
1188 .await;
1189
1190 match result {
1191 Ok(_) => {
1192 if let Some(storage) = &self.storage {
1193 storage.register_user(&self.user_id).await?;
1194 }
1195 Ok(())
1196 }
1197 Err(e) => {
1198 let err_str = e.to_string();
1199 if err_str.contains("M_USER_IN_USE") {
1200 if let Some(storage) = &self.storage {
1201 storage.register_user(&self.user_id).await?;
1202 }
1203 Ok(())
1204 } else {
1205 Err(e)
1206 }
1207 }
1208 }
1209 }
1210
1211 pub async fn ensure_joined(&self, room_id_or_alias: &str) -> anyhow::Result<String> {
1212 self.ensure_registered().await?;
1213 self.client.join_room(room_id_or_alias).await
1214 }
1215
1216 pub async fn ensure_registered_and_joined(
1217 &self,
1218 room_id_or_alias: &str,
1219 ) -> anyhow::Result<String> {
1220 self.ensure_registered().await?;
1221 self.ensure_joined(room_id_or_alias).await
1222 }
1223
1224 pub async fn get_joined_rooms(&self) -> anyhow::Result<Vec<String>> {
1225 self.ensure_registered().await?;
1226 self.client.get_joined_rooms().await
1227 }
1228
1229 pub async fn refresh_joined_rooms(&self) -> anyhow::Result<Vec<String>> {
1230 self.get_joined_rooms().await
1231 }
1232
1233 pub async fn join_room(&self, room_id_or_alias: &str) -> anyhow::Result<String> {
1234 self.ensure_registered().await?;
1235 self.client.join_room(room_id_or_alias).await
1236 }
1237
1238 pub async fn leave_room(&self, room_id: &str, reason: Option<&str>) -> anyhow::Result<()> {
1239 self.ensure_registered().await?;
1240 self.client.leave_room(room_id, reason).await
1241 }
1242
1243 pub async fn send_text(&self, room_id: &str, body: &str) -> anyhow::Result<String> {
1244 self.ensure_registered_and_joined(room_id).await?;
1245 self.client.send_text(room_id, body).await
1246 }
1247
1248 pub async fn send_notice(&self, room_id: &str, body: &str) -> anyhow::Result<String> {
1249 self.ensure_registered_and_joined(room_id).await?;
1250 self.client.send_notice(room_id, body).await
1251 }
1252
1253 pub async fn send_event(
1254 &self,
1255 room_id: &str,
1256 event_type: &str,
1257 content: &Value,
1258 ) -> anyhow::Result<String> {
1259 self.ensure_registered_and_joined(room_id).await?;
1260 self.client.send_event(room_id, event_type, content).await
1261 }
1262
1263 pub async fn send_state_event(
1264 &self,
1265 room_id: &str,
1266 event_type: &str,
1267 state_key: &str,
1268 content: &Value,
1269 ) -> anyhow::Result<String> {
1270 self.ensure_registered_and_joined(room_id).await?;
1271 self.client
1272 .send_state_event(room_id, event_type, state_key, content)
1273 .await
1274 }
1275
1276 pub async fn send_message(&self, room_id: &str, body: &str) -> anyhow::Result<String> {
1277 self.send_text(room_id, body).await
1278 }
1279
1280 pub async fn invite_user(&self, user_id: &str, room_id: &str) -> anyhow::Result<()> {
1281 self.ensure_registered().await?;
1282 self.client.invite_user(user_id, room_id).await
1283 }
1284
1285 pub async fn kick_user(
1286 &self,
1287 user_id: &str,
1288 room_id: &str,
1289 reason: Option<&str>,
1290 ) -> anyhow::Result<()> {
1291 self.ensure_registered().await?;
1292 self.client.kick_user(user_id, room_id, reason).await
1293 }
1294
1295 pub async fn ban_user(
1296 &self,
1297 user_id: &str,
1298 room_id: &str,
1299 reason: Option<&str>,
1300 ) -> anyhow::Result<()> {
1301 self.ensure_registered().await?;
1302 self.client.ban_user(user_id, room_id, reason).await
1303 }
1304
1305 pub async fn unban_user(&self, user_id: &str, room_id: &str) -> anyhow::Result<()> {
1306 self.ensure_registered().await?;
1307 self.client.unban_user(user_id, room_id).await
1308 }
1309
1310 pub async fn redact_event(
1311 &self,
1312 room_id: &str,
1313 event_id: &str,
1314 reason: Option<&str>,
1315 ) -> anyhow::Result<String> {
1316 self.ensure_registered().await?;
1317 self.client.redact_event(room_id, event_id, reason).await
1318 }
1319
1320 pub async fn get_room_members(
1321 &self,
1322 room_id: &str,
1323 ) -> anyhow::Result<Vec<crate::models::events::MembershipEvent>> {
1324 self.ensure_registered().await?;
1325 self.client.get_room_members(room_id, None, None).await
1326 }
1327
1328 pub async fn enable_encryption(&self, room_id: &str) -> anyhow::Result<()> {
1329 self.ensure_registered_and_joined(room_id).await?;
1330 let content = json!({
1331 "algorithm": "m.megolm.v1.aes-sha2"
1332 });
1333 self.client
1334 .send_state_event(room_id, "m.room.encryption", "", &content)
1335 .await?;
1336 Ok(())
1337 }
1338
1339 pub fn unstable_apis(&self) -> UnstableIntentApis {
1340 UnstableIntentApis {
1341 intent: self.clone(),
1342 }
1343 }
1344}
1345
1346#[derive(Clone)]
1347pub struct UnstableIntentApis {
1348 intent: Intent,
1349}
1350
1351impl UnstableIntentApis {
1352 pub async fn knock_room(
1353 &self,
1354 room_id_or_alias: &str,
1355 reason: Option<&str>,
1356 ) -> anyhow::Result<String> {
1357 self.intent.ensure_registered().await?;
1358 let encoded = utf8_percent_encode(room_id_or_alias, NON_ALPHANUMERIC).to_string();
1359 let body = reason.map(|r| json!({ "reason": r })).unwrap_or_default();
1360 let response = self
1361 .intent
1362 .client
1363 .raw_json(
1364 Method::POST,
1365 &format!("/_matrix/client/unstable/knock/{encoded}"),
1366 Some(body),
1367 )
1368 .await?;
1369 response
1370 .get("room_id")
1371 .and_then(Value::as_str)
1372 .map(ToOwned::to_owned)
1373 .ok_or_else(|| anyhow::anyhow!("missing room_id in knock response"))
1374 }
1375
1376 pub async fn get_room_hierarchy(&self, room_id: &str) -> anyhow::Result<Value> {
1377 self.intent.ensure_registered().await?;
1378 let encoded = utf8_percent_encode(room_id, NON_ALPHANUMERIC).to_string();
1379 self.intent
1380 .client
1381 .raw_json(
1382 Method::GET,
1383 &format!("/_matrix/client/unstable/org.matrix.msc3266/rooms/{encoded}/hierarchy"),
1384 None,
1385 )
1386 .await
1387 }
1388}
1389
1390pub const REMOTE_USER_INFO_ACCOUNT_DATA_EVENT_TYPE: &str = "io.t2bot.sdk.bot.remote_user_info";
1391pub const REMOTE_ROOM_INFO_ACCOUNT_DATA_EVENT_TYPE: &str = "io.t2bot.sdk.bot.remote_room_info";
1392pub const REMOTE_USER_MAP_ACCOUNT_DATA_EVENT_TYPE_PREFIX: &str = "io.t2bot.sdk.bot.remote_user_map";
1393pub const REMOTE_ROOM_MAP_ACCOUNT_DATA_EVENT_TYPE_PREFIX: &str = "io.t2bot.sdk.bot.remote_room_map";
1394
1395#[derive(Clone)]
1396pub struct MatrixBridge {
1397 bot_intent: Intent,
1398 namespace_to_user: Arc<RwLock<HashMap<String, String>>>,
1399}
1400
1401impl MatrixBridge {
1402 pub fn new(bot_intent: Intent) -> Self {
1403 Self {
1404 bot_intent,
1405 namespace_to_user: Arc::new(RwLock::new(HashMap::new())),
1406 }
1407 }
1408
1409 pub async fn map_virtual_user(
1410 &self,
1411 namespace: impl Into<String>,
1412 user_id: impl Into<String>,
1413 ) -> anyhow::Result<()> {
1414 self.namespace_to_user
1415 .write()
1416 .await
1417 .insert(namespace.into(), user_id.into());
1418 Ok(())
1419 }
1420
1421 pub async fn resolve_virtual_user(&self, namespace: &str) -> Option<String> {
1422 self.namespace_to_user.read().await.get(namespace).cloned()
1423 }
1424
1425 pub async fn get_remote_user_info<T: serde::de::DeserializeOwned>(
1427 &self,
1428 user_intent: &Intent,
1429 ) -> anyhow::Result<T> {
1430 user_intent.ensure_registered().await?;
1431 let value = self
1432 .bot_intent
1433 .underlying_client()
1434 .get_account_data(REMOTE_USER_INFO_ACCOUNT_DATA_EVENT_TYPE)
1435 .await?;
1436 serde_json::from_value(value).context("Failed to parse remote user info")
1437 }
1438
1439 pub async fn set_remote_user_info<T: serde::Serialize>(
1442 &self,
1443 user_intent: &Intent,
1444 remote_info: &T,
1445 ) -> anyhow::Result<()> {
1446 user_intent.ensure_registered().await?;
1447 let value = serde_json::to_value(remote_info)?;
1448 let id = value
1449 .get("id")
1450 .and_then(|v| v.as_str())
1451 .ok_or_else(|| anyhow::anyhow!("remote_info must have an 'id' field"))?;
1452 self.bot_intent
1453 .underlying_client()
1454 .set_account_data(REMOTE_USER_INFO_ACCOUNT_DATA_EVENT_TYPE, &value)
1455 .await?;
1456 self.update_remote_user_mapping(user_intent.user_id(), id)
1457 .await
1458 }
1459
1460 pub async fn get_remote_room_info<T: serde::de::DeserializeOwned>(
1462 &self,
1463 matrix_room_id: &str,
1464 ) -> anyhow::Result<T> {
1465 self.bot_intent.ensure_registered().await?;
1466 let value = self
1467 .bot_intent
1468 .underlying_client()
1469 .get_room_account_data(matrix_room_id, REMOTE_ROOM_INFO_ACCOUNT_DATA_EVENT_TYPE)
1470 .await?;
1471 serde_json::from_value(value).context("Failed to parse remote room info")
1472 }
1473
1474 pub async fn set_remote_room_info<T: serde::Serialize>(
1477 &self,
1478 matrix_room_id: &str,
1479 remote_info: &T,
1480 ) -> anyhow::Result<()> {
1481 self.bot_intent.ensure_registered().await?;
1482 let value = serde_json::to_value(remote_info)?;
1483 let id = value
1484 .get("id")
1485 .and_then(|v| v.as_str())
1486 .ok_or_else(|| anyhow::anyhow!("remote_info must have an 'id' field"))?;
1487 self.bot_intent
1488 .underlying_client()
1489 .set_room_account_data(
1490 matrix_room_id,
1491 REMOTE_ROOM_INFO_ACCOUNT_DATA_EVENT_TYPE,
1492 &value,
1493 )
1494 .await?;
1495 self.update_remote_room_mapping(matrix_room_id, id).await
1496 }
1497
1498 pub async fn get_matrix_room_id_for_remote(
1500 &self,
1501 remote_room_id: &str,
1502 ) -> anyhow::Result<String> {
1503 self.bot_intent.ensure_registered().await?;
1504 let event_type =
1505 format!("{REMOTE_ROOM_MAP_ACCOUNT_DATA_EVENT_TYPE_PREFIX}.{remote_room_id}");
1506 let value = self
1507 .bot_intent
1508 .underlying_client()
1509 .get_account_data(&event_type)
1510 .await?;
1511 value
1512 .get("id")
1513 .and_then(|v| v.as_str())
1514 .map(ToOwned::to_owned)
1515 .ok_or_else(|| anyhow::anyhow!("Missing 'id' in remote room mapping"))
1516 }
1517
1518 pub async fn get_intent_for_remote(&self, remote_user_id: &str) -> anyhow::Result<Intent> {
1520 self.bot_intent.ensure_registered().await?;
1521 let event_type =
1522 format!("{REMOTE_USER_MAP_ACCOUNT_DATA_EVENT_TYPE_PREFIX}.{remote_user_id}");
1523 let value = self
1524 .bot_intent
1525 .underlying_client()
1526 .get_account_data(&event_type)
1527 .await?;
1528 let user_id = value
1529 .get("id")
1530 .and_then(|v| v.as_str())
1531 .ok_or_else(|| anyhow::anyhow!("Missing 'id' in remote user mapping"))?;
1532 Ok(Intent::new(
1534 user_id,
1535 self.bot_intent.underlying_client().clone(),
1536 ))
1537 }
1538
1539 async fn update_remote_user_mapping(
1540 &self,
1541 matrix_user_id: &str,
1542 remote_user_id: &str,
1543 ) -> anyhow::Result<()> {
1544 let event_type =
1545 format!("{REMOTE_USER_MAP_ACCOUNT_DATA_EVENT_TYPE_PREFIX}.{remote_user_id}");
1546 self.bot_intent
1547 .underlying_client()
1548 .set_account_data(&event_type, &json!({ "id": matrix_user_id }))
1549 .await
1550 }
1551
1552 async fn update_remote_room_mapping(
1553 &self,
1554 matrix_room_id: &str,
1555 remote_room_id: &str,
1556 ) -> anyhow::Result<()> {
1557 let event_type =
1558 format!("{REMOTE_ROOM_MAP_ACCOUNT_DATA_EVENT_TYPE_PREFIX}.{remote_room_id}");
1559 self.bot_intent
1560 .underlying_client()
1561 .set_account_data(&event_type, &json!({ "id": matrix_room_id }))
1562 .await
1563 }
1564}
1565
1566#[async_trait]
1567pub trait UnstableAppserviceApis: Send + Sync {
1568 async fn unstable_ping(&self) -> anyhow::Result<Value>;
1569
1570 async fn send_historical_event_batch(
1572 &self,
1573 room_id: &str,
1574 prev_event_id: &str,
1575 events: Vec<Value>,
1576 state_events_at_start: Vec<Value>,
1577 chunk_id: Option<String>,
1578 ) -> anyhow::Result<crate::models::MSC2716BatchSendResponse>;
1579
1580 async fn send_event_with_timestamp(
1582 &self,
1583 room_id: &str,
1584 event_type: &str,
1585 content: &Value,
1586 ts: u64,
1587 ) -> anyhow::Result<String>;
1588
1589 async fn send_state_event_with_timestamp(
1591 &self,
1592 room_id: &str,
1593 event_type: &str,
1594 state_key: &str,
1595 content: &Value,
1596 ts: u64,
1597 ) -> anyhow::Result<String>;
1598}
1599
1600#[async_trait]
1601impl UnstableAppserviceApis for Appservice {
1602 async fn unstable_ping(&self) -> anyhow::Result<Value> {
1603 self.client
1604 .raw_json(Method::GET, "/_matrix/app/unstable/ping", None)
1605 .await
1606 }
1607
1608 async fn send_historical_event_batch(
1609 &self,
1610 room_id: &str,
1611 prev_event_id: &str,
1612 events: Vec<Value>,
1613 state_events_at_start: Vec<Value>,
1614 chunk_id: Option<String>,
1615 ) -> anyhow::Result<crate::models::MSC2716BatchSendResponse> {
1616 let room_id = utf8_percent_encode(room_id, NON_ALPHANUMERIC).to_string();
1617 let body = json!({
1618 "prev_event": prev_event_id,
1619 "chunk_id": chunk_id,
1620 "events": events,
1621 "state_events_at_start": state_events_at_start,
1622 });
1623 let response = self
1624 .client
1625 .raw_json(
1626 Method::POST,
1627 &format!("/_matrix/client/unstable/org.matrix.msc2716/rooms/{room_id}/batch_send"),
1628 Some(body),
1629 )
1630 .await?;
1631 serde_json::from_value(response).context("Failed to parse batch send response")
1632 }
1633
1634 async fn send_event_with_timestamp(
1635 &self,
1636 room_id: &str,
1637 event_type: &str,
1638 content: &Value,
1639 ts: u64,
1640 ) -> anyhow::Result<String> {
1641 let room_id = utf8_percent_encode(room_id, NON_ALPHANUMERIC).to_string();
1642 let event_type = utf8_percent_encode(event_type, NON_ALPHANUMERIC).to_string();
1643 let txn_id = format!(
1644 "{}__inc_appts{}",
1645 chrono::Utc::now().timestamp_millis(),
1646 Uuid::new_v4()
1647 );
1648 let txn_id = utf8_percent_encode(&txn_id, NON_ALPHANUMERIC).to_string();
1649 let response = self
1650 .client
1651 .raw_json(
1652 Method::PUT,
1653 &format!(
1654 "/_matrix/client/v3/rooms/{room_id}/send/{event_type}/{txn_id}?ts={}",
1655 ts
1656 ),
1657 Some(content.clone()),
1658 )
1659 .await?;
1660 response
1661 .get("event_id")
1662 .and_then(|v| v.as_str())
1663 .map(ToOwned::to_owned)
1664 .ok_or_else(|| anyhow::anyhow!("Missing event_id in response"))
1665 }
1666
1667 async fn send_state_event_with_timestamp(
1668 &self,
1669 room_id: &str,
1670 event_type: &str,
1671 state_key: &str,
1672 content: &Value,
1673 ts: u64,
1674 ) -> anyhow::Result<String> {
1675 let room_id = utf8_percent_encode(room_id, NON_ALPHANUMERIC).to_string();
1676 let event_type = utf8_percent_encode(event_type, NON_ALPHANUMERIC).to_string();
1677 let state_key = utf8_percent_encode(state_key, NON_ALPHANUMERIC).to_string();
1678 let response = self
1679 .client
1680 .raw_json(
1681 Method::PUT,
1682 &format!(
1683 "/_matrix/client/v3/rooms/{room_id}/state/{event_type}/{state_key}?ts={}",
1684 ts
1685 ),
1686 Some(content.clone()),
1687 )
1688 .await?;
1689 response
1690 .get("event_id")
1691 .and_then(|v| v.as_str())
1692 .map(ToOwned::to_owned)
1693 .ok_or_else(|| anyhow::anyhow!("Missing event_id in response"))
1694 }
1695}