1use crate::config::Config;
2use futures::future::join_all;
3use std::collections::HashSet;
4use std::sync::Arc;
5use tokio::sync::Semaphore;
6
7use ruma::{
8 OwnedEventId, OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomAliasId, UserId,
9 api::client::{
10 account::whoami,
11 alias::get_alias,
12 appservice::request_ping,
13 membership::{join_room_by_id, joined_rooms, leave_room},
14 profile::get_profile,
15 room::get_room_event,
16 space::{SpaceHierarchyRoomsChunk, get_hierarchy},
17 state::{get_state_events, get_state_events_for_key},
18 },
19 events::{
20 AnyStateEvent, AnyTimelineEvent, StateEventType,
21 room::{
22 avatar::RoomAvatarEventContent, canonical_alias::RoomCanonicalAliasEventContent,
23 name::RoomNameEventContent, topic::RoomTopicEventContent,
24 },
25 },
26};
27
28use anyhow;
29
30use serde::{Deserialize, Serialize};
31
32pub type HttpClient = ruma::client::http_client::HyperNativeTls;
33
34use std::sync::Mutex;
35
36#[derive(Clone)]
37pub struct AppService {
38 client: ruma::Client<HttpClient>,
39 config: Config,
40 pub appservice_id: String,
41 pub user_id: Box<OwnedUserId>,
42 pub joined_rooms: Arc<Mutex<Vec<OwnedRoomId>>>,
43}
44
45pub type RoomState = Vec<ruma::serde::Raw<AnyStateEvent>>;
46
47#[derive(Clone)]
48pub struct JoinedRoomState {
49 pub room_id: OwnedRoomId,
50 pub state: Option<RoomState>,
51}
52
53impl AppService {
54 pub async fn new(config: &Config) -> Result<Self, anyhow::Error> {
55 let client = ruma::Client::builder()
56 .homeserver_url(config.matrix.homeserver.clone())
57 .access_token(Some(config.appservice.access_token.clone()))
58 .build::<HttpClient>()
59 .await?;
60
61 let user_id = UserId::parse(format!(
62 "@{}:{}",
63 config.appservice.sender_localpart, config.matrix.server_name
64 ))?;
65
66 let whoami = client.send_request(whoami::v3::Request::new()).await;
67
68 if whoami.is_err() {
69 tracing::info!("Failed to authenticate with homeserver. Check your access token.");
70 return Err(anyhow::anyhow!(
71 "Failed to authenticate with homeserver. Check your appservice access token."
72 ));
73 }
74
75 let joined_rooms = match client.send_request(joined_rooms::v3::Request::new()).await {
76 Ok(r) => r.joined_rooms,
77 Err(_) => vec![],
78 };
79
80 Ok(Self {
81 client,
82 config: config.clone(),
83 appservice_id: config.appservice.id.clone(),
84 user_id: Box::new(user_id),
85 joined_rooms: Arc::new(Mutex::new(joined_rooms)),
86 })
87 }
88
89 pub fn add_to_joined_rooms(&self, room_id: OwnedRoomId) -> Result<(), anyhow::Error> {
90 let mut rooms = self
91 .joined_rooms
92 .lock()
93 .map_err(|_| anyhow::anyhow!("Failed to acquire lock on joined_rooms"))?;
94
95 if !rooms.contains(&room_id) {
96 rooms.push(room_id.clone());
97 }
98 tracing::info!(
99 "Added room {} to joined rooms. Current count: {}",
100 room_id,
101 rooms.len()
102 );
103 Ok(())
104 }
105
106 pub fn remove_from_joined_rooms(&self, room_id: &OwnedRoomId) -> Result<(), anyhow::Error> {
107 let mut rooms = self
108 .joined_rooms
109 .lock()
110 .map_err(|_| anyhow::anyhow!("Failed to acquire lock on joined_rooms"))?;
111
112 if let Some(pos) = rooms.iter().position(|x| x == room_id) {
113 rooms.remove(pos);
114 }
115 tracing::info!(
116 "Removed room {} from joined rooms. Current count: {}",
117 room_id,
118 rooms.len()
119 );
120 Ok(())
121 }
122
123 pub async fn health_check(&self) -> Result<(), anyhow::Error> {
124 let response = self.client.send_request(whoami::v3::Request::new()).await?;
126
127 if response.user_id == *self.user_id {
128 Ok(())
129 } else {
130 Err(anyhow::anyhow!("Health check failed: User ID mismatch"))
131 }
132 }
133
134 pub async fn ping_homeserver(
135 &self,
136 id: String,
137 ) -> Result<request_ping::v1::Response, anyhow::Error> {
138 let mut req = request_ping::v1::Request::new(self.appservice_id.to_string());
139
140 req.transaction_id = Some(OwnedTransactionId::from(id));
141
142 let response = self.client.send_request(req).await?;
143 Ok(response)
144 }
145
146 pub fn user_id(&self) -> String {
147 self.user_id.to_string()
148 }
149
150 pub async fn whoami(&self) -> Result<whoami::v3::Response, anyhow::Error> {
151 let r = self.client.send_request(whoami::v3::Request::new()).await?;
152 Ok(r)
153 }
154
155 pub async fn join_room(&self, room_id: OwnedRoomId) -> Result<bool, anyhow::Error> {
156 let jr = self
157 .client
158 .send_request(join_room_by_id::v3::Request::new(room_id.clone()))
159 .await?;
160
161 tracing::info!("Joined room: {:#?}", jr);
162
163 Ok(jr.room_id == room_id)
164 }
165
166 pub async fn has_joined_room(&self, room_id: OwnedRoomId) -> Result<bool, anyhow::Error> {
167 let jr = self
168 .client
169 .send_request(get_state_events_for_key::v3::Request::new(
170 room_id,
171 StateEventType::RoomMember,
172 self.user_id(),
173 ))
174 .await?;
175
176 let membership = jr.content.get_field::<String>("membership")?;
177
178 Ok(membership == Some("join".to_string()))
179 }
180
181 pub async fn get_room_state(&self, room_id: OwnedRoomId) -> Result<RoomState, anyhow::Error> {
182 let state = self
183 .client
184 .send_request(get_state_events::v3::Request::new(room_id))
185 .await?;
186
187 Ok(state.room_state)
188 }
189
190 pub async fn is_space(&self, room_id: OwnedRoomId) -> Result<bool, anyhow::Error> {
191 let hierarchy = self
192 .client
193 .send_request(get_hierarchy::v1::Request::new(room_id.clone()))
194 .await?;
195
196 Ok(hierarchy.rooms.len() > 1)
199 }
200
201 pub async fn leave_room(&self, room_id: OwnedRoomId) -> Result<(), anyhow::Error> {
202 let hierarchy = self
204 .client
205 .send_request(get_hierarchy::v1::Request::new(room_id.clone()))
206 .await?;
207
208 tracing::info!("Hierarchy rooms: {:#?}", hierarchy.rooms.len());
209
210 for room in hierarchy.rooms {
211 if room.room_id == room_id {
212 continue;
213 }
214 let left = self
215 .client
216 .send_request(leave_room::v3::Request::new(room.room_id.clone()))
217 .await?;
218 tracing::info!("Left child room: {:#?}", room.room_id);
219 tracing::info!("Left child room: {:#?}", left);
220 }
221
222 let left = self
223 .client
224 .send_request(leave_room::v3::Request::new(room_id))
225 .await?;
226
227 tracing::info!("Left room: {:#?}", left);
228
229 Ok(())
230 }
231
232 pub async fn joined_rooms(&self) -> Result<Vec<OwnedRoomId>, anyhow::Error> {
233 let jr = self
234 .client
235 .send_request(joined_rooms::v3::Request::new())
236 .await?;
237
238 Ok(jr.joined_rooms)
239 }
240
241 pub async fn room_id_from_alias(
242 &self,
243 room_alias: ruma::OwnedRoomAliasId,
244 ) -> Result<ruma::OwnedRoomId, anyhow::Error> {
245 let room_id = self
246 .client
247 .send_request(get_alias::v3::Request::new(room_alias))
248 .await?;
249
250 Ok(room_id.room_id)
251 }
252
253 pub async fn joined_rooms_state(&self) -> Result<Option<Vec<JoinedRoomState>>, anyhow::Error> {
254 let semaphore = Arc::new(Semaphore::new(10));
255 let curated = self.config.public_rooms.curated;
256 let include_rooms = &self.config.public_rooms.include_rooms;
257
258 if curated && !include_rooms.is_empty() {
259 let mut all_room_ids = Vec::new();
260
261 let space_futures: Vec<_> = include_rooms
262 .iter()
263 .map(|local_part| {
264 let sem = semaphore.clone();
265 let server_name = self.config.matrix.server_name.clone();
266 let self_ref = self;
267 async move {
268 let _permit = sem.acquire().await.ok()?;
269
270 let alias = format!("#{local_part}:{server_name}");
271 let alias = RoomAliasId::parse(&alias).ok()?;
272 let room_id = self_ref.room_id_from_alias(alias).await.ok()?;
273
274 let hierarchy = self_ref
276 .client
277 .send_request(get_hierarchy::v1::Request::new(room_id.clone()))
278 .await
279 .ok()?;
280
281 let mut room_ids = vec![room_id];
282 room_ids.extend(hierarchy.rooms.into_iter().map(|room| room.room_id));
283
284 Some(room_ids)
285 }
286 })
287 .collect();
288
289 let hierarchy_results = join_all(space_futures).await;
290
291 let mut unique_room_ids = HashSet::new();
292 for room_ids in hierarchy_results.into_iter().flatten() {
293 for room_id in room_ids {
294 unique_room_ids.insert(room_id);
295 }
296 }
297 all_room_ids.extend(unique_room_ids);
298
299 let state_futures: Vec<_> = all_room_ids
300 .into_iter()
301 .map(|room_id| {
302 let sem = semaphore.clone();
303 let self_ref = self;
304 async move {
305 let _permit = sem.acquire().await.ok()?;
306
307 let st = self_ref
308 .client
309 .send_request(get_state_events::v3::Request::new(room_id.clone()))
310 .await
311 .ok()?;
312
313 Some(JoinedRoomState {
314 room_id,
315 state: Some(st.room_state),
316 })
317 }
318 })
319 .collect();
320
321 let state_results = join_all(state_futures).await;
322 let joined_rooms: Vec<_> = state_results.into_iter().flatten().collect();
323
324 return Ok(Some(joined_rooms));
325 }
326
327 let jr = self
329 .client
330 .send_request(joined_rooms::v3::Request::new())
331 .await?;
332
333 if jr.joined_rooms.is_empty() {
334 return Ok(None);
335 }
336
337 let state_futures: Vec<_> = jr
338 .joined_rooms
339 .into_iter()
340 .map(|room_id| {
341 let sem = semaphore.clone();
342 let self_ref = self;
343 async move {
344 let _permit = sem.acquire().await.ok()?;
345
346 let st = self_ref
347 .client
348 .send_request(get_state_events::v3::Request::new(room_id.clone()))
349 .await
350 .ok()?;
351
352 Some(JoinedRoomState {
353 room_id,
354 state: Some(st.room_state),
355 })
356 }
357 })
358 .collect();
359
360 let results = join_all(state_futures).await;
361 let joined_rooms: Vec<_> = results.into_iter().flatten().collect();
362
363 Ok(Some(joined_rooms))
364 }
365
366 pub async fn joined_rooms_state_alt(
367 &self,
368 ) -> Result<Option<Vec<JoinedRoomState>>, anyhow::Error> {
369 let curated = self.config.public_rooms.curated;
370 let include_rooms = &self.config.public_rooms.include_rooms;
371
372 if curated && !include_rooms.is_empty() {
373 let mut joined_rooms: Vec<JoinedRoomState> = Vec::new();
375
376 for local_part in include_rooms {
378 let alias = format!("#{}:{}", local_part, self.config.matrix.server_name);
379
380 let alias = RoomAliasId::parse(&alias)?;
381
382 let room_id = self.room_id_from_alias(alias).await?;
383
384 let mut jrs = JoinedRoomState {
385 room_id: room_id.clone(),
386 state: None,
387 };
388
389 let st = self
390 .client
391 .send_request(get_state_events::v3::Request::new(room_id.clone()))
392 .await?;
393
394 jrs.state = Some(st.room_state);
395
396 joined_rooms.push(jrs);
397
398 let hierarchy = self
400 .client
401 .send_request(get_hierarchy::v1::Request::new(room_id))
402 .await?;
403
404 for room in hierarchy.rooms {
405 let mut jrs = JoinedRoomState {
406 room_id: room.room_id.clone(),
407 state: None,
408 };
409 let st = self
410 .client
411 .send_request(get_state_events::v3::Request::new(room.room_id.clone()))
412 .await?;
413
414 jrs.state = Some(st.room_state);
415
416 let exists = joined_rooms.iter().any(|r| r.room_id == room.room_id);
417 if !exists {
418 joined_rooms.push(jrs);
419 }
420 }
421 }
422
423 return Ok(Some(joined_rooms));
424 }
425
426 let mut joined_rooms: Vec<JoinedRoomState> = Vec::new();
427
428 let jr = self
429 .client
430 .send_request(joined_rooms::v3::Request::new())
431 .await?;
432
433 if jr.joined_rooms.is_empty() {
434 return Ok(None);
435 }
436
437 for room_id in jr.joined_rooms {
438 let mut jrs = JoinedRoomState {
439 room_id: room_id.clone(),
440 state: None,
441 };
442
443 let st = self
444 .client
445 .send_request(get_state_events::v3::Request::new(room_id))
446 .await?;
447
448 jrs.state = Some(st.room_state);
449
450 joined_rooms.push(jrs);
451 }
452
453 Ok(Some(joined_rooms))
454 }
455
456 pub async fn get_room_event(
457 &self,
458 room_id: OwnedRoomId,
459 event_id: OwnedEventId,
460 ) -> Result<ruma::serde::Raw<AnyTimelineEvent>, anyhow::Error> {
461 let event = self
462 .client
463 .send_request(get_room_event::v3::Request::new(room_id, event_id))
464 .await?;
465
466 Ok(event.event)
467 }
468
469 pub async fn get_profile(
470 &self,
471 user_id: &str,
472 ) -> Result<get_profile::v3::Response, anyhow::Error> {
473 let parsed_id = ruma::OwnedUserId::try_from(user_id.to_string())?;
474
475 let profile = self
476 .client
477 .send_request(get_profile::v3::Request::new(parsed_id))
478 .await?;
479
480 Ok(profile)
481 }
482
483 pub async fn get_room_summary(
484 &self,
485 room_id: OwnedRoomId,
486 ) -> Result<RoomSummary, anyhow::Error> {
487 let has_joined = self.has_joined_room(room_id.clone()).await?;
489
490 if !has_joined {
491 return Err(anyhow::anyhow!(
493 "Appservice has not joined the room: {}",
494 room_id
495 ));
496 }
497
498 let mut room_info = RoomSummary {
499 room_id: room_id.to_string(),
500 ..Default::default()
501 };
502
503 let state = self
504 .client
505 .send_request(get_state_events::v3::Request::new(room_id))
506 .await?;
507
508 for state_event in state.room_state {
509 let event_type = match state_event.get_field::<String>("type") {
510 Ok(Some(t)) => t,
511 _ => continue,
512 };
513
514 match event_type.as_str() {
515 "m.room.name" => {
516 if let Ok(Some(content)) =
517 state_event.get_field::<RoomNameEventContent>("content")
518 {
519 room_info.name = if content.name.is_empty() {
520 None
521 } else {
522 Some(content.name.to_string())
523 };
524 }
525 }
526 "m.room.canonical_alias" => {
527 if let Ok(Some(content)) =
528 state_event.get_field::<RoomCanonicalAliasEventContent>("content")
529 {
530 room_info.canonical_alias = content.alias.map(|a| a.to_string());
531 }
532 }
533 "m.room.avatar" => {
534 if let Ok(Some(content)) =
535 state_event.get_field::<RoomAvatarEventContent>("content")
536 {
537 room_info.avatar_url = match content.url {
538 Some(url) => {
539 if url.to_string().is_empty() {
540 None
541 } else {
542 Some(url.to_string())
543 }
544 }
545 None => None,
546 };
547 }
548 }
549 "commune.room.banner" => {
550 if let Ok(Some(content)) =
551 state_event.get_field::<RoomAvatarEventContent>("content")
552 {
553 room_info.banner_url = content.url.map(|url| url.to_string());
554 }
555 }
556 "m.room.topic" => {
557 if let Ok(Some(content)) =
558 state_event.get_field::<RoomTopicEventContent>("content")
559 {
560 room_info.topic = if content.topic.is_empty() {
561 None
562 } else {
563 Some(content.topic.to_string())
564 };
565 }
566 }
567 _ => {}
568 }
569 }
570
571 Ok(room_info)
572 }
573
574 pub async fn get_room_hierarchy(
575 &self,
576 room_id: OwnedRoomId,
577 ) -> Result<Vec<SpaceHierarchyRoomsChunk>, anyhow::Error> {
578 let hierarchy = self
579 .client
580 .send_request(get_hierarchy::v1::Request::new(room_id))
581 .await?;
582
583 Ok(hierarchy.rooms)
584 }
585
586 pub async fn get_public_spaces(&self) -> Result<Option<Vec<RoomSummary>>, anyhow::Error> {
587 let semaphore = Arc::new(Semaphore::new(10));
588
589 if self.config.spaces.include_all {
590 let jr = self
591 .client
592 .send_request(joined_rooms::v3::Request::new())
593 .await?;
594
595 if jr.joined_rooms.is_empty() {
596 return Ok(None);
597 }
598
599 let space_futures: Vec<_> = jr
600 .joined_rooms
601 .into_iter()
602 .map(|room_id| {
603 let sem = semaphore.clone();
604 let self_ref = self;
605 async move {
606 let _permit = sem.acquire().await.ok()?;
607
608 let is_space = self_ref.is_space(room_id.clone()).await.ok()?;
609 if !is_space {
610 return None;
611 }
612
613 let summary = self_ref.get_room_summary(room_id).await.ok()?;
614 Some(summary)
615 }
616 })
617 .collect();
618
619 let results = join_all(space_futures).await;
620 let spaces: Vec<_> = results.into_iter().flatten().collect();
621
622 if spaces.is_empty() {
623 return Ok(None);
624 }
625
626 return Ok(Some(spaces));
627 }
628
629 let default_spaces = self.config.spaces.default.clone();
630
631 if default_spaces.is_empty() {
632 return Ok(None);
633 }
634
635 let space_futures: Vec<_> = default_spaces
636 .into_iter()
637 .map(|space| {
638 let sem = semaphore.clone();
639 let server_name = self.config.matrix.server_name.clone();
640 let self_ref = self;
641 async move {
642 let _permit = sem.acquire().await.ok()?;
643
644 let raw_alias = format!("#{space}:{server_name}");
645
646 let alias = RoomAliasId::parse(&raw_alias).ok()?;
647
648 let room_id = self_ref.room_id_from_alias(alias).await.ok()?;
649
650 let summary = self_ref.get_room_summary(room_id).await.ok()?;
651 Some(summary)
652 }
653 })
654 .collect();
655
656 let results = join_all(space_futures).await;
657 let spaces: Vec<_> = results.into_iter().flatten().collect();
658
659 Ok(Some(spaces))
660 }
661}
662
663#[derive(Default, Clone, Debug, Deserialize, Serialize)]
664pub struct RoomSummary {
665 pub room_id: String,
666 #[serde(skip_serializing_if = "Option::is_none")]
667 pub name: Option<String>,
668 #[serde(skip_serializing_if = "Option::is_none")]
669 pub canonical_alias: Option<String>,
670 #[serde(skip_serializing_if = "Option::is_none")]
671 pub avatar_url: Option<String>,
672 #[serde(skip_serializing_if = "Option::is_none")]
673 pub banner_url: Option<String>,
674 #[serde(skip_serializing_if = "Option::is_none")]
675 pub topic: Option<String>,
676}