public_appservice/
appservice.rs

1use crate::config::Config;
2use futures::future::join_all;
3use std::collections::HashSet;
4use std::sync::Arc;
5use tokio::sync::Semaphore;
6
7use ruma::{
8    OwnedEventId, OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomAliasId, UserId,
9    api::client::{
10        account::whoami,
11        alias::get_alias,
12        appservice::request_ping,
13        membership::{join_room_by_id, joined_rooms, leave_room},
14        profile::get_profile,
15        room::get_room_event,
16        space::{SpaceHierarchyRoomsChunk, get_hierarchy},
17        state::{get_state_events, get_state_events_for_key},
18    },
19    events::{
20        AnyStateEvent, AnyTimelineEvent, StateEventType,
21        room::{
22            avatar::RoomAvatarEventContent, canonical_alias::RoomCanonicalAliasEventContent,
23            name::RoomNameEventContent, topic::RoomTopicEventContent,
24        },
25    },
26};
27
28use anyhow;
29
30use serde::{Deserialize, Serialize};
31
32pub type HttpClient = ruma::client::http_client::HyperNativeTls;
33
34use std::sync::Mutex;
35
36#[derive(Clone)]
37pub struct AppService {
38    client: ruma::Client<HttpClient>,
39    config: Config,
40    pub appservice_id: String,
41    pub user_id: Box<OwnedUserId>,
42    pub joined_rooms: Arc<Mutex<Vec<OwnedRoomId>>>,
43}
44
45pub type RoomState = Vec<ruma::serde::Raw<AnyStateEvent>>;
46
47#[derive(Clone)]
48pub struct JoinedRoomState {
49    pub room_id: OwnedRoomId,
50    pub state: Option<RoomState>,
51}
52
53impl AppService {
54    pub async fn new(config: &Config) -> Result<Self, anyhow::Error> {
55        let client = ruma::Client::builder()
56            .homeserver_url(config.matrix.homeserver.clone())
57            .access_token(Some(config.appservice.access_token.clone()))
58            .build::<HttpClient>()
59            .await?;
60
61        let user_id = UserId::parse(format!(
62            "@{}:{}",
63            config.appservice.sender_localpart, config.matrix.server_name
64        ))?;
65
66        let whoami = client.send_request(whoami::v3::Request::new()).await;
67
68        if whoami.is_err() {
69            tracing::info!("Failed to authenticate with homeserver. Check your access token.");
70            return Err(anyhow::anyhow!(
71                "Failed to authenticate with homeserver. Check your appservice access token."
72            ));
73        }
74
75        let joined_rooms = match client.send_request(joined_rooms::v3::Request::new()).await {
76            Ok(r) => r.joined_rooms,
77            Err(_) => vec![],
78        };
79
80        Ok(Self {
81            client,
82            config: config.clone(),
83            appservice_id: config.appservice.id.clone(),
84            user_id: Box::new(user_id),
85            joined_rooms: Arc::new(Mutex::new(joined_rooms)),
86        })
87    }
88
89    pub fn add_to_joined_rooms(&self, room_id: OwnedRoomId) -> Result<(), anyhow::Error> {
90        let mut rooms = self
91            .joined_rooms
92            .lock()
93            .map_err(|_| anyhow::anyhow!("Failed to acquire lock on joined_rooms"))?;
94
95        if !rooms.contains(&room_id) {
96            rooms.push(room_id.clone());
97        }
98        tracing::info!(
99            "Added room {} to joined rooms. Current count: {}",
100            room_id,
101            rooms.len()
102        );
103        Ok(())
104    }
105
106    pub fn remove_from_joined_rooms(&self, room_id: &OwnedRoomId) -> Result<(), anyhow::Error> {
107        let mut rooms = self
108            .joined_rooms
109            .lock()
110            .map_err(|_| anyhow::anyhow!("Failed to acquire lock on joined_rooms"))?;
111
112        if let Some(pos) = rooms.iter().position(|x| x == room_id) {
113            rooms.remove(pos);
114        }
115        tracing::info!(
116            "Removed room {} from joined rooms. Current count: {}",
117            room_id,
118            rooms.len()
119        );
120        Ok(())
121    }
122
123    pub async fn health_check(&self) -> Result<(), anyhow::Error> {
124        // Perform a simple request to check if the appservice is healthy
125        let response = self.client.send_request(whoami::v3::Request::new()).await?;
126
127        if response.user_id == *self.user_id {
128            Ok(())
129        } else {
130            Err(anyhow::anyhow!("Health check failed: User ID mismatch"))
131        }
132    }
133
134    pub async fn ping_homeserver(
135        &self,
136        id: String,
137    ) -> Result<request_ping::v1::Response, anyhow::Error> {
138        let mut req = request_ping::v1::Request::new(self.appservice_id.to_string());
139
140        req.transaction_id = Some(OwnedTransactionId::from(id));
141
142        let response = self.client.send_request(req).await?;
143        Ok(response)
144    }
145
146    pub fn user_id(&self) -> String {
147        self.user_id.to_string()
148    }
149
150    pub async fn whoami(&self) -> Result<whoami::v3::Response, anyhow::Error> {
151        let r = self.client.send_request(whoami::v3::Request::new()).await?;
152        Ok(r)
153    }
154
155    pub async fn join_room(&self, room_id: OwnedRoomId) -> Result<bool, anyhow::Error> {
156        let jr = self
157            .client
158            .send_request(join_room_by_id::v3::Request::new(room_id.clone()))
159            .await?;
160
161        tracing::info!("Joined room: {:#?}", jr);
162
163        Ok(jr.room_id == room_id)
164    }
165
166    pub async fn has_joined_room(&self, room_id: OwnedRoomId) -> Result<bool, anyhow::Error> {
167        let jr = self
168            .client
169            .send_request(get_state_events_for_key::v3::Request::new(
170                room_id,
171                StateEventType::RoomMember,
172                self.user_id(),
173            ))
174            .await?;
175
176        let membership = jr.content.get_field::<String>("membership")?;
177
178        Ok(membership == Some("join".to_string()))
179    }
180
181    pub async fn get_room_state(&self, room_id: OwnedRoomId) -> Result<RoomState, anyhow::Error> {
182        let state = self
183            .client
184            .send_request(get_state_events::v3::Request::new(room_id))
185            .await?;
186
187        Ok(state.room_state)
188    }
189
190    pub async fn is_space(&self, room_id: OwnedRoomId) -> Result<bool, anyhow::Error> {
191        let hierarchy = self
192            .client
193            .send_request(get_hierarchy::v1::Request::new(room_id.clone()))
194            .await?;
195
196        // hierarchy contains the room itself and all child rooms
197        // so we check if there is more than one room in the hierarchy
198        Ok(hierarchy.rooms.len() > 1)
199    }
200
201    pub async fn leave_room(&self, room_id: OwnedRoomId) -> Result<(), anyhow::Error> {
202        // First leave all child rooms
203        let hierarchy = self
204            .client
205            .send_request(get_hierarchy::v1::Request::new(room_id.clone()))
206            .await?;
207
208        tracing::info!("Hierarchy rooms: {:#?}", hierarchy.rooms.len());
209
210        for room in hierarchy.rooms {
211            if room.room_id == room_id {
212                continue;
213            }
214            let left = self
215                .client
216                .send_request(leave_room::v3::Request::new(room.room_id.clone()))
217                .await?;
218            tracing::info!("Left child room: {:#?}", room.room_id);
219            tracing::info!("Left child room: {:#?}", left);
220        }
221
222        let left = self
223            .client
224            .send_request(leave_room::v3::Request::new(room_id))
225            .await?;
226
227        tracing::info!("Left room: {:#?}", left);
228
229        Ok(())
230    }
231
232    pub async fn joined_rooms(&self) -> Result<Vec<OwnedRoomId>, anyhow::Error> {
233        let jr = self
234            .client
235            .send_request(joined_rooms::v3::Request::new())
236            .await?;
237
238        Ok(jr.joined_rooms)
239    }
240
241    pub async fn room_id_from_alias(
242        &self,
243        room_alias: ruma::OwnedRoomAliasId,
244    ) -> Result<ruma::OwnedRoomId, anyhow::Error> {
245        let room_id = self
246            .client
247            .send_request(get_alias::v3::Request::new(room_alias))
248            .await?;
249
250        Ok(room_id.room_id)
251    }
252
253    pub async fn joined_rooms_state(&self) -> Result<Option<Vec<JoinedRoomState>>, anyhow::Error> {
254        let semaphore = Arc::new(Semaphore::new(10));
255        let curated = self.config.public_rooms.curated;
256        let include_rooms = &self.config.public_rooms.include_rooms;
257
258        if curated && !include_rooms.is_empty() {
259            let mut all_room_ids = Vec::new();
260
261            let space_futures: Vec<_> = include_rooms
262                .iter()
263                .map(|local_part| {
264                    let sem = semaphore.clone();
265                    let server_name = self.config.matrix.server_name.clone();
266                    let self_ref = self;
267                    async move {
268                        let _permit = sem.acquire().await.ok()?;
269
270                        let alias = format!("#{local_part}:{server_name}");
271                        let alias = RoomAliasId::parse(&alias).ok()?;
272                        let room_id = self_ref.room_id_from_alias(alias).await.ok()?;
273
274                        // Get hierarchy for this space
275                        let hierarchy = self_ref
276                            .client
277                            .send_request(get_hierarchy::v1::Request::new(room_id.clone()))
278                            .await
279                            .ok()?;
280
281                        let mut room_ids = vec![room_id];
282                        room_ids.extend(hierarchy.rooms.into_iter().map(|room| room.room_id));
283
284                        Some(room_ids)
285                    }
286                })
287                .collect();
288
289            let hierarchy_results = join_all(space_futures).await;
290
291            let mut unique_room_ids = HashSet::new();
292            for room_ids in hierarchy_results.into_iter().flatten() {
293                for room_id in room_ids {
294                    unique_room_ids.insert(room_id);
295                }
296            }
297            all_room_ids.extend(unique_room_ids);
298
299            let state_futures: Vec<_> = all_room_ids
300                .into_iter()
301                .map(|room_id| {
302                    let sem = semaphore.clone();
303                    let self_ref = self;
304                    async move {
305                        let _permit = sem.acquire().await.ok()?;
306
307                        let st = self_ref
308                            .client
309                            .send_request(get_state_events::v3::Request::new(room_id.clone()))
310                            .await
311                            .ok()?;
312
313                        Some(JoinedRoomState {
314                            room_id,
315                            state: Some(st.room_state),
316                        })
317                    }
318                })
319                .collect();
320
321            let state_results = join_all(state_futures).await;
322            let joined_rooms: Vec<_> = state_results.into_iter().flatten().collect();
323
324            return Ok(Some(joined_rooms));
325        }
326
327        // Handle the non-curated case
328        let jr = self
329            .client
330            .send_request(joined_rooms::v3::Request::new())
331            .await?;
332
333        if jr.joined_rooms.is_empty() {
334            return Ok(None);
335        }
336
337        let state_futures: Vec<_> = jr
338            .joined_rooms
339            .into_iter()
340            .map(|room_id| {
341                let sem = semaphore.clone();
342                let self_ref = self;
343                async move {
344                    let _permit = sem.acquire().await.ok()?;
345
346                    let st = self_ref
347                        .client
348                        .send_request(get_state_events::v3::Request::new(room_id.clone()))
349                        .await
350                        .ok()?;
351
352                    Some(JoinedRoomState {
353                        room_id,
354                        state: Some(st.room_state),
355                    })
356                }
357            })
358            .collect();
359
360        let results = join_all(state_futures).await;
361        let joined_rooms: Vec<_> = results.into_iter().flatten().collect();
362
363        Ok(Some(joined_rooms))
364    }
365
366    pub async fn joined_rooms_state_alt(
367        &self,
368    ) -> Result<Option<Vec<JoinedRoomState>>, anyhow::Error> {
369        let curated = self.config.public_rooms.curated;
370        let include_rooms = &self.config.public_rooms.include_rooms;
371
372        if curated && !include_rooms.is_empty() {
373            // Get subset of joined rooms from config
374            let mut joined_rooms: Vec<JoinedRoomState> = Vec::new();
375
376            // first get top level spaces
377            for local_part in include_rooms {
378                let alias = format!("#{}:{}", local_part, self.config.matrix.server_name);
379
380                let alias = RoomAliasId::parse(&alias)?;
381
382                let room_id = self.room_id_from_alias(alias).await?;
383
384                let mut jrs = JoinedRoomState {
385                    room_id: room_id.clone(),
386                    state: None,
387                };
388
389                let st = self
390                    .client
391                    .send_request(get_state_events::v3::Request::new(room_id.clone()))
392                    .await?;
393
394                jrs.state = Some(st.room_state);
395
396                joined_rooms.push(jrs);
397
398                // find child rooms and add to list
399                let hierarchy = self
400                    .client
401                    .send_request(get_hierarchy::v1::Request::new(room_id))
402                    .await?;
403
404                for room in hierarchy.rooms {
405                    let mut jrs = JoinedRoomState {
406                        room_id: room.room_id.clone(),
407                        state: None,
408                    };
409                    let st = self
410                        .client
411                        .send_request(get_state_events::v3::Request::new(room.room_id.clone()))
412                        .await?;
413
414                    jrs.state = Some(st.room_state);
415
416                    let exists = joined_rooms.iter().any(|r| r.room_id == room.room_id);
417                    if !exists {
418                        joined_rooms.push(jrs);
419                    }
420                }
421            }
422
423            return Ok(Some(joined_rooms));
424        }
425
426        let mut joined_rooms: Vec<JoinedRoomState> = Vec::new();
427
428        let jr = self
429            .client
430            .send_request(joined_rooms::v3::Request::new())
431            .await?;
432
433        if jr.joined_rooms.is_empty() {
434            return Ok(None);
435        }
436
437        for room_id in jr.joined_rooms {
438            let mut jrs = JoinedRoomState {
439                room_id: room_id.clone(),
440                state: None,
441            };
442
443            let st = self
444                .client
445                .send_request(get_state_events::v3::Request::new(room_id))
446                .await?;
447
448            jrs.state = Some(st.room_state);
449
450            joined_rooms.push(jrs);
451        }
452
453        Ok(Some(joined_rooms))
454    }
455
456    pub async fn get_room_event(
457        &self,
458        room_id: OwnedRoomId,
459        event_id: OwnedEventId,
460    ) -> Result<ruma::serde::Raw<AnyTimelineEvent>, anyhow::Error> {
461        let event = self
462            .client
463            .send_request(get_room_event::v3::Request::new(room_id, event_id))
464            .await?;
465
466        Ok(event.event)
467    }
468
469    pub async fn get_profile(
470        &self,
471        user_id: &str,
472    ) -> Result<get_profile::v3::Response, anyhow::Error> {
473        let parsed_id = ruma::OwnedUserId::try_from(user_id.to_string())?;
474
475        let profile = self
476            .client
477            .send_request(get_profile::v3::Request::new(parsed_id))
478            .await?;
479
480        Ok(profile)
481    }
482
483    pub async fn get_room_summary(
484        &self,
485        room_id: OwnedRoomId,
486    ) -> Result<RoomSummary, anyhow::Error> {
487        // find out if appservice has joined the room or not
488        let has_joined = self.has_joined_room(room_id.clone()).await?;
489
490        if !has_joined {
491            // If not joined, we cannot get the state
492            return Err(anyhow::anyhow!(
493                "Appservice has not joined the room: {}",
494                room_id
495            ));
496        }
497
498        let mut room_info = RoomSummary {
499            room_id: room_id.to_string(),
500            ..Default::default()
501        };
502
503        let state = self
504            .client
505            .send_request(get_state_events::v3::Request::new(room_id))
506            .await?;
507
508        for state_event in state.room_state {
509            let event_type = match state_event.get_field::<String>("type") {
510                Ok(Some(t)) => t,
511                _ => continue,
512            };
513
514            match event_type.as_str() {
515                "m.room.name" => {
516                    if let Ok(Some(content)) =
517                        state_event.get_field::<RoomNameEventContent>("content")
518                    {
519                        room_info.name = if content.name.is_empty() {
520                            None
521                        } else {
522                            Some(content.name.to_string())
523                        };
524                    }
525                }
526                "m.room.canonical_alias" => {
527                    if let Ok(Some(content)) =
528                        state_event.get_field::<RoomCanonicalAliasEventContent>("content")
529                    {
530                        room_info.canonical_alias = content.alias.map(|a| a.to_string());
531                    }
532                }
533                "m.room.avatar" => {
534                    if let Ok(Some(content)) =
535                        state_event.get_field::<RoomAvatarEventContent>("content")
536                    {
537                        room_info.avatar_url = match content.url {
538                            Some(url) => {
539                                if url.to_string().is_empty() {
540                                    None
541                                } else {
542                                    Some(url.to_string())
543                                }
544                            }
545                            None => None,
546                        };
547                    }
548                }
549                "commune.room.banner" => {
550                    if let Ok(Some(content)) =
551                        state_event.get_field::<RoomAvatarEventContent>("content")
552                    {
553                        room_info.banner_url = content.url.map(|url| url.to_string());
554                    }
555                }
556                "m.room.topic" => {
557                    if let Ok(Some(content)) =
558                        state_event.get_field::<RoomTopicEventContent>("content")
559                    {
560                        room_info.topic = if content.topic.is_empty() {
561                            None
562                        } else {
563                            Some(content.topic.to_string())
564                        };
565                    }
566                }
567                _ => {}
568            }
569        }
570
571        Ok(room_info)
572    }
573
574    pub async fn get_room_hierarchy(
575        &self,
576        room_id: OwnedRoomId,
577    ) -> Result<Vec<SpaceHierarchyRoomsChunk>, anyhow::Error> {
578        let hierarchy = self
579            .client
580            .send_request(get_hierarchy::v1::Request::new(room_id))
581            .await?;
582
583        Ok(hierarchy.rooms)
584    }
585
586    pub async fn get_public_spaces(&self) -> Result<Option<Vec<RoomSummary>>, anyhow::Error> {
587        let semaphore = Arc::new(Semaphore::new(10));
588
589        if self.config.spaces.include_all {
590            let jr = self
591                .client
592                .send_request(joined_rooms::v3::Request::new())
593                .await?;
594
595            if jr.joined_rooms.is_empty() {
596                return Ok(None);
597            }
598
599            let space_futures: Vec<_> = jr
600                .joined_rooms
601                .into_iter()
602                .map(|room_id| {
603                    let sem = semaphore.clone();
604                    let self_ref = self;
605                    async move {
606                        let _permit = sem.acquire().await.ok()?;
607
608                        let is_space = self_ref.is_space(room_id.clone()).await.ok()?;
609                        if !is_space {
610                            return None;
611                        }
612
613                        let summary = self_ref.get_room_summary(room_id).await.ok()?;
614                        Some(summary)
615                    }
616                })
617                .collect();
618
619            let results = join_all(space_futures).await;
620            let spaces: Vec<_> = results.into_iter().flatten().collect();
621
622            if spaces.is_empty() {
623                return Ok(None);
624            }
625
626            return Ok(Some(spaces));
627        }
628
629        let default_spaces = self.config.spaces.default.clone();
630
631        if default_spaces.is_empty() {
632            return Ok(None);
633        }
634
635        let space_futures: Vec<_> = default_spaces
636            .into_iter()
637            .map(|space| {
638                let sem = semaphore.clone();
639                let server_name = self.config.matrix.server_name.clone();
640                let self_ref = self;
641                async move {
642                    let _permit = sem.acquire().await.ok()?;
643
644                    let raw_alias = format!("#{space}:{server_name}");
645
646                    let alias = RoomAliasId::parse(&raw_alias).ok()?;
647
648                    let room_id = self_ref.room_id_from_alias(alias).await.ok()?;
649
650                    let summary = self_ref.get_room_summary(room_id).await.ok()?;
651                    Some(summary)
652                }
653            })
654            .collect();
655
656        let results = join_all(space_futures).await;
657        let spaces: Vec<_> = results.into_iter().flatten().collect();
658
659        Ok(Some(spaces))
660    }
661}
662
663#[derive(Default, Clone, Debug, Deserialize, Serialize)]
664pub struct RoomSummary {
665    pub room_id: String,
666    #[serde(skip_serializing_if = "Option::is_none")]
667    pub name: Option<String>,
668    #[serde(skip_serializing_if = "Option::is_none")]
669    pub canonical_alias: Option<String>,
670    #[serde(skip_serializing_if = "Option::is_none")]
671    pub avatar_url: Option<String>,
672    #[serde(skip_serializing_if = "Option::is_none")]
673    pub banner_url: Option<String>,
674    #[serde(skip_serializing_if = "Option::is_none")]
675    pub topic: Option<String>,
676}