tauri_plugin_matrix_svelte/matrix/workers.rs
1use std::{collections::BTreeMap, sync::Arc, time::Duration};
2
3use anyhow::bail;
4use futures::{pin_mut, StreamExt};
5use matrix_sdk::{
6 ruma::{
7 api::client::receipt::create_receipt::v3::ReceiptType, events::receipt::ReceiptThread,
8 matrix_uri::MatrixId, OwnedRoomId, RoomOrAliasId,
9 },
10 Client,
11};
12use tauri::{AppHandle, Listener, Manager, Runtime};
13use tokio::{
14 runtime::Handle,
15 sync::{mpsc::UnboundedReceiver, Mutex},
16 task::JoinHandle,
17};
18
19use crate::{
20 matrix::{
21 notifications::enqueue_popup_notification,
22 requests::submit_async_request,
23 room::rooms_list::{enqueue_rooms_list_update, RoomsCollectionStatus, RoomsListUpdate},
24 rooms::UnreadMessageCount,
25 singletons::{broadcast_event, UIUpdateMessage, ALL_JOINED_ROOMS, CLIENT},
26 sync::sync,
27 timeline::{PaginationDirection, TimelineUpdate},
28 user_power_level::UserPowerLevels,
29 user_profile::{enqueue_user_profile_update, UserProfileUpdate},
30 utils::current_user_id,
31 },
32 models::matrix::{MatrixSvelteListenEvent, MatrixUpdateCurrentActiveRoom},
33};
34
35use super::{requests::MatrixRequest, room::rooms_list::RoomsList, utils::debounce_broadcast};
36
37/// The main loop that actually uses a Matrix client
38pub async fn async_main_loop<R: Runtime>(
39 app_handle: AppHandle<R>,
40 client: Client,
41) -> anyhow::Result<()> {
42 let logged_in_user_id = client
43 .user_id()
44 .expect("BUG: client.user_id() returned None after successful login!");
45 let status = RoomsCollectionStatus::Loading(format!(
46 "Logged in as {}.\n → Loading rooms...",
47 logged_in_user_id
48 ));
49 // enqueue_popup_notification(status.clone());
50 enqueue_rooms_list_update(RoomsListUpdate::Status { status });
51
52 // Listen for updates to the ignored user list.
53 // handle_ignore_user_list_subscriber(client.clone());
54 let rooms_list_handle = app_handle.app_handle().clone();
55 tauri::async_runtime::spawn(ui_worker(rooms_list_handle));
56
57 // call sync
58 sync(&app_handle, client).await?;
59
60 bail!("room list service sync loop ended unexpectedly")
61}
62
63/// The entry point for an async worker thread that can run async tasks.
64///
65/// All this thread does is wait for [`MatrixRequests`] from the main UI-driven non-async thread(s)
66/// and then executes them within an async runtime context.
67pub async fn async_worker(
68 mut request_receiver: UnboundedReceiver<MatrixRequest>,
69) -> anyhow::Result<()> {
70 println!("Started async_worker task.");
71 let mut tasks_list: BTreeMap<OwnedRoomId, JoinHandle<()>> = BTreeMap::new();
72 while let Some(request) = request_receiver.recv().await {
73 match request {
74 MatrixRequest::PaginateRoomTimeline {
75 room_id,
76 num_events,
77 direction,
78 } => {
79 let (timeline, sender) = {
80 let mut all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
81 let Some(room_info) = all_joined_rooms.get_mut(&room_id) else {
82 println!("Skipping pagination request for not-yet-known room {room_id}");
83 continue;
84 };
85
86 let timeline_ref = room_info.timeline.clone();
87 let sender = room_info.timeline_update_sender.clone();
88 (timeline_ref, sender)
89 };
90
91 // Spawn a new async task that will make the actual pagination request.
92 let _paginate_task = Handle::current().spawn(async move {
93 println!("Starting {direction} pagination request for room {room_id}...");
94 sender.send(TimelineUpdate::PaginationRunning(direction)).unwrap();
95 broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
96
97 let res = if direction == PaginationDirection::Forwards {
98 timeline.paginate_forwards(num_events).await
99 } else {
100 timeline.paginate_backwards(num_events).await
101 };
102
103 match res {
104 Ok(fully_paginated) => {
105 println!("Completed {direction} pagination request for room {room_id}, hit {} of timeline? {}",
106 if direction == PaginationDirection::Forwards { "end" } else { "start" },
107 if fully_paginated { "yes" } else { "no" },
108 );
109 sender.send(TimelineUpdate::PaginationIdle {
110 fully_paginated,
111 direction,
112 }).unwrap();
113 broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
114 }
115 Err(error) => {
116 eprintln!("Error sending {direction} pagination request for room {room_id}: {error:?}");
117 sender.send(TimelineUpdate::PaginationError {
118 error,
119 direction,
120 }).unwrap();
121 broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
122 }
123 }
124 });
125 }
126
127 MatrixRequest::EditMessage {
128 room_id,
129 timeline_event_item_id: timeline_event_id,
130 edited_content,
131 } => {
132 let (timeline, sender) = {
133 let mut all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
134 let Some(room_info) = all_joined_rooms.get_mut(&room_id) else {
135 eprintln!("BUG: room info not found for edit request, room {room_id}");
136 continue;
137 };
138 (
139 room_info.timeline.clone(),
140 room_info.timeline_update_sender.clone(),
141 )
142 };
143
144 // Spawn a new async task that will make the actual edit request.
145 let _edit_task = Handle::current().spawn(async move {
146 println!("Sending request to edit message {timeline_event_id:?} in room {room_id}...");
147 let result = timeline.edit(&timeline_event_id, edited_content).await;
148 match result {
149 Ok(_) => println!("Successfully edited message {timeline_event_id:?} in room {room_id}."),
150 Err(ref e) => eprintln!("Error editing message {timeline_event_id:?} in room {room_id}: {e:?}"),
151 }
152 sender.send(TimelineUpdate::MessageEdited {
153 timeline_event_id,
154 result,
155 }).unwrap();
156 broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
157 });
158 }
159
160 MatrixRequest::FetchDetailsForEvent { room_id, event_id } => {
161 let (timeline, sender) = {
162 let mut all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
163 let Some(room_info) = all_joined_rooms.get_mut(&room_id) else {
164 eprintln!("BUG: room info not found for fetch details for event request {room_id}");
165 continue;
166 };
167
168 (
169 room_info.timeline.clone(),
170 room_info.timeline_update_sender.clone(),
171 )
172 };
173
174 // Spawn a new async task that will make the actual fetch request.
175 let _fetch_task = Handle::current().spawn(async move {
176 // println!("Sending request to fetch details for event {event_id} in room {room_id}...");
177 let result = timeline.fetch_details_for_event(&event_id).await;
178 match result {
179 Ok(_) => {
180 // println!("Successfully fetched details for event {event_id} in room {room_id}.");
181 }
182 Err(ref _e) => {
183 // eprintln!("Error fetching details for event {event_id} in room {room_id}: {e:?}");
184 }
185 }
186 sender
187 .send(TimelineUpdate::EventDetailsFetched { event_id, result })
188 .unwrap();
189 broadcast_event(UIUpdateMessage::RefreshUI)
190 .expect("Couldn't broadcast event to UI");
191 });
192 }
193
194 MatrixRequest::SyncRoomMemberList { room_id } => {
195 let (timeline, sender) = {
196 let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
197 let Some(room_info) = all_joined_rooms.get(&room_id) else {
198 eprintln!("BUG: room info not found for fetch members request {room_id}");
199 continue;
200 };
201
202 (
203 room_info.timeline.clone(),
204 room_info.timeline_update_sender.clone(),
205 )
206 };
207
208 // Spawn a new async task that will make the actual fetch request.
209 let _fetch_task = Handle::current().spawn(async move {
210 println!("Sending sync room members request for room {room_id}...");
211 timeline.fetch_members().await;
212 println!("Completed sync room members request for room {room_id}.");
213 sender.send(TimelineUpdate::RoomMembersSynced).unwrap();
214 broadcast_event(UIUpdateMessage::RefreshUI)
215 .expect("Couldn't broadcast event to UI");
216 });
217 }
218
219 // MatrixRequest::JoinRoom { room_id } => {
220 // let Some(client) = CLIENT.get() else { continue };
221 // let _join_room_task = Handle::current().spawn(async move {
222 // println!("Sending request to join room {room_id}...");
223 // let result_action = if let Some(room) = client.get_room(&room_id) {
224 // match room.join().await {
225 // Ok(()) => {
226 // println!("Successfully joined room {room_id}.");
227 // JoinRoomAction::Joined { room_id }
228 // }
229 // Err(e) => {
230 // eprintln!("Error joining room {room_id}: {e:?}");
231 // JoinRoomAction::Failed { room_id, error: e }
232 // }
233 // }
234 // } else {
235 // eprintln!("BUG: client could not get room with ID {room_id}");
236 // JoinRoomAction::Failed {
237 // room_id,
238 // error: matrix_sdk::Error::UnknownError(
239 // String::from("Client couldn't locate room to join it.").into(),
240 // ),
241 // }
242 // };
243 // Cx::post_action(result_action);
244 // });
245 // }
246 // MatrixRequest::LeaveRoom { room_id } => {
247 // let Some(client) = CLIENT.get() else { continue };
248 // let _leave_room_task = Handle::current().spawn(async move {
249 // println!("Sending request to leave room {room_id}...");
250 // let result_action = if let Some(room) = client.get_room(&room_id) {
251 // match room.leave().await {
252 // Ok(()) => {
253 // println!("Successfully left room {room_id}.");
254 // LeaveRoomAction::Left { room_id }
255 // }
256 // Err(e) => {
257 // eprintln!("Error leaving room {room_id}: {e:?}");
258 // LeaveRoomAction::Failed {
259 // room_id,
260 // error: e.to_string(),
261 // }
262 // }
263 // }
264 // } else {
265 // eprintln!("BUG: client could not get room with ID {room_id}");
266 // LeaveRoomAction::Failed {
267 // room_id,
268 // error: String::from("Client couldn't locate room to leave it."),
269 // }
270 // };
271 // Cx::post_action(result_action);
272 // });
273 // }
274 MatrixRequest::GetRoomMembers {
275 room_id,
276 memberships,
277 local_only,
278 } => {
279 let (timeline, sender) = {
280 let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
281 let Some(room_info) = all_joined_rooms.get(&room_id) else {
282 println!("BUG: room info not found for get room members request {room_id}");
283 continue;
284 };
285 (
286 room_info.timeline.clone(),
287 room_info.timeline_update_sender.clone(),
288 )
289 };
290
291 let _get_members_task = Handle::current().spawn(async move {
292 let room = timeline.room();
293
294 if local_only {
295 if let Ok(members) = room.members_no_sync(memberships).await {
296 println!(
297 "Got {} members from cache for room {}",
298 members.len(),
299 room_id
300 );
301 sender
302 .send(TimelineUpdate::RoomMembersListFetched { members })
303 .unwrap();
304 }
305 } else {
306 if let Ok(members) = room.members(memberships).await {
307 println!(
308 "Successfully fetched {} members from server for room {}",
309 members.len(),
310 room_id
311 );
312 sender
313 .send(TimelineUpdate::RoomMembersListFetched { members })
314 .unwrap();
315 }
316 }
317
318 broadcast_event(UIUpdateMessage::RefreshUI)
319 .expect("Couldn't broadcast event to UI");
320 });
321 }
322
323 // MatrixRequest::GetUserProfile {
324 // user_id,
325 // room_id,
326 // local_only,
327 // } => {
328 // let Some(client) = CLIENT.get() else { continue };
329 // let _fetch_task = Handle::current().spawn(async move {
330 // // println!("Sending get user profile request: user: {user_id}, \
331 // // room: {room_id:?}, local_only: {local_only}...",
332 // // );
333
334 // let mut update = None;
335
336 // if let Some(room_id) = room_id.as_ref() {
337 // if let Some(room) = client.get_room(room_id) {
338 // let member = if local_only {
339 // room.get_member_no_sync(&user_id).await
340 // } else {
341 // room.get_member(&user_id).await
342 // };
343 // if let Ok(Some(room_member)) = member {
344 // update = Some(UserProfileUpdate::Full {
345 // new_profile: UserProfile {
346 // username: room_member.display_name().map(|u| u.to_owned()),
347 // user_id: user_id.clone(),
348 // avatar_state: AvatarState::Known(room_member.avatar_url().map(|u| u.to_owned())),
349 // },
350 // room_id: room_id.to_owned(),
351 // room_member,
352 // });
353 // } else {
354 // println!("User profile request: user {user_id} was not a member of room {room_id}");
355 // }
356 // } else {
357 // println!("User profile request: client could not get room with ID {room_id}");
358 // }
359 // }
360
361 // if !local_only {
362 // if update.is_none() {
363 // if let Ok(response) = client.account().fetch_user_profile_of(&user_id).await {
364 // update = Some(UserProfileUpdate::UserProfileOnly(
365 // UserProfile {
366 // username: response.displayname,
367 // user_id: user_id.clone(),
368 // avatar_state: AvatarState::Known(response.avatar_url),
369 // }
370 // ));
371 // } else {
372 // println!("User profile request: client could not get user with ID {user_id}");
373 // }
374 // }
375
376 // match update.as_mut() {
377 // Some(UserProfileUpdate::Full { new_profile: UserProfile { username, .. }, .. }) if username.is_none() => {
378 // if let Ok(response) = client.account().fetch_user_profile_of(&user_id).await {
379 // *username = response.displayname;
380 // }
381 // }
382 // _ => { }
383 // }
384 // }
385
386 // if let Some(upd) = update {
387 // // println!("Successfully completed get user profile request: user: {user_id}, room: {room_id:?}, local_only: {local_only}.");
388 // enqueue_user_profile_update(upd);
389 // } else {
390 // println!("Failed to get user profile: user: {user_id}, room: {room_id:?}, local_only: {local_only}.");
391 // }
392 // });
393 // }
394 MatrixRequest::GetNumberUnreadMessages { room_id } => {
395 let (timeline, sender) = {
396 let mut all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
397 let Some(room_info) = all_joined_rooms.get_mut(&room_id) else {
398 println!("Skipping get number of unread messages request for not-yet-known room {room_id}");
399 continue;
400 };
401
402 (
403 room_info.timeline.clone(),
404 room_info.timeline_update_sender.clone(),
405 )
406 };
407 let _get_unreads_task = Handle::current().spawn(async move {
408 match sender.send(TimelineUpdate::NewUnreadMessagesCount(
409 UnreadMessageCount::Known(timeline.room().num_unread_messages())
410 )) {
411 Ok(_) => {
412 broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
413 },
414 Err(e) => println!("Failed to send timeline update: {e:?} for GetNumberUnreadMessages request for room {room_id}"),
415 }
416 enqueue_rooms_list_update(RoomsListUpdate::UpdateNumUnreadMessages {
417 room_id: room_id.clone(),
418 count: UnreadMessageCount::Known(timeline.room().num_unread_messages()),
419 unread_mentions:timeline.room().num_unread_mentions(),
420 });
421 });
422 }
423 MatrixRequest::IgnoreUser {
424 ignore,
425 room_member,
426 room_id,
427 } => {
428 let Some(client) = CLIENT.get() else { continue };
429 let _ignore_task = Handle::current().spawn(async move {
430 let user_id = room_member.user_id();
431 println!("Sending request to {}ignore user: {user_id}...", if ignore { "" } else { "un" });
432 let ignore_result = if ignore {
433 room_member.ignore().await
434 } else {
435 room_member.unignore().await
436 };
437
438 println!("{} user {user_id} {}",
439 if ignore { "Ignoring" } else { "Unignoring" },
440 if ignore_result.is_ok() { "succeeded." } else { "failed." },
441 );
442
443 if ignore_result.is_err() {
444 return;
445 }
446
447 // We need to re-acquire the `RoomMember` object now that its state
448 // has changed, i.e., the user has been (un)ignored.
449 // We then need to send an update to replace the cached `RoomMember`
450 // with the now-stale ignored state.
451 if let Some(room) = client.get_room(&room_id) {
452 if let Ok(Some(new_room_member)) = room.get_member(user_id).await {
453 println!("Enqueueing user profile update for user {user_id}, who went from {}ignored to {}ignored.",
454 if room_member.is_ignored() { "" } else { "un" },
455 if new_room_member.is_ignored() { "" } else { "un" },
456 );
457 enqueue_user_profile_update(UserProfileUpdate::RoomMemberOnly {
458 room_id: room_id.clone(),
459 room_member: new_room_member,
460 });
461 }
462 }
463
464 // After successfully (un)ignoring a user, all timelines are fully cleared by the Matrix SDK.
465 // Therefore, we need to re-fetch all timelines for all rooms,
466 // and currently the only way to actually accomplish this is via pagination.
467 // See: <https://github.com/matrix-org/matrix-rust-sdk/issues/1703#issuecomment-2250297923>
468 //
469 // Note that here we only proactively re-paginate the *current* room
470 // (the one being viewed by the user when this ignore request was issued),
471 // and all other rooms will be re-paginated in `handle_ignore_user_list_subscriber()`.`
472 submit_async_request(MatrixRequest::PaginateRoomTimeline {
473 room_id,
474 num_events: 50,
475 direction: PaginationDirection::Backwards,
476 });
477 });
478 }
479
480 MatrixRequest::SendTypingNotice { room_id, typing } => {
481 let Some(room) = CLIENT.get().and_then(|c| c.get_room(&room_id)) else {
482 eprintln!("BUG: client/room not found for typing notice request {room_id}");
483 continue;
484 };
485 let _typing_task = Handle::current().spawn(async move {
486 if let Err(e) = room.typing_notice(typing).await {
487 eprintln!("Failed to send typing notice to room {room_id}: {e:?}");
488 }
489 });
490 }
491
492 MatrixRequest::SubscribeToTypingNotices { room_id, subscribe } => {
493 let (room, timeline_update_sender, mut typing_notice_receiver) = {
494 let mut all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
495 let Some(room_info) = all_joined_rooms.get_mut(&room_id) else {
496 println!("BUG: room info not found for subscribe to typing notices request, room {room_id}");
497 continue;
498 };
499 let (room, recv) = if subscribe {
500 if room_info.typing_notice_subscriber.is_some() {
501 println!(
502 "Note: room {room_id} is already subscribed to typing notices."
503 );
504 continue;
505 } else {
506 let Some(room) = CLIENT.get().and_then(|c| c.get_room(&room_id)) else {
507 eprintln!("BUG: client/room not found when subscribing to typing notices request, room: {room_id}");
508 continue;
509 };
510 let (drop_guard, recv) = room.subscribe_to_typing_notifications();
511 room_info.typing_notice_subscriber = Some(drop_guard);
512 (room, recv)
513 }
514 } else {
515 room_info.typing_notice_subscriber.take();
516 continue;
517 };
518 // Here: we don't have an existing subscriber running, so we fall through and start one.
519 (room, room_info.timeline_update_sender.clone(), recv)
520 };
521
522 let _typing_notices_task = Handle::current().spawn(async move {
523 while let Ok(user_ids) = typing_notice_receiver.recv().await {
524 // println!("Received typing notifications for room {room_id}: {user_ids:?}");
525 let mut users = Vec::with_capacity(user_ids.len());
526 for user_id in user_ids {
527 users.push(
528 room.get_member_no_sync(&user_id)
529 .await
530 .ok()
531 .flatten()
532 .and_then(|m| m.display_name().map(|d| d.to_owned()))
533 .unwrap_or_else(|| user_id.to_string())
534 );
535 }
536 if let Err(e) = timeline_update_sender.send(TimelineUpdate::TypingUsers { users }) {
537 eprintln!("Error: timeline update sender couldn't send the list of typing users: {e:?}");
538 }
539 broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
540 }
541 // println!("Note: typing notifications recv loop has ended for room {}", room_id);
542 });
543 }
544 MatrixRequest::SubscribeToOwnUserReadReceiptsChanged { room_id, subscribe } => {
545 if !subscribe {
546 if let Some(task_handler) = tasks_list.remove(&room_id) {
547 task_handler.abort();
548 }
549 continue;
550 }
551 let (timeline, sender) = {
552 let mut all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
553 let Some(room_info) = all_joined_rooms.get_mut(&room_id) else {
554 println!("BUG: room info not found for subscribe to own user read receipts changed request, room {room_id}");
555 continue;
556 };
557 (
558 room_info.timeline.clone(),
559 room_info.timeline_update_sender.clone(),
560 )
561 };
562
563 let subscribe_own_read_receipt_task = Handle::current().spawn(async move {
564 let update_receiver = timeline.subscribe_own_user_read_receipts_changed().await;
565 pin_mut!(update_receiver);
566 if let Some(client_user_id) = current_user_id() {
567 if let Some((event_id, receipt)) =
568 timeline.latest_user_read_receipt(&client_user_id).await
569 {
570 println!("Received own user read receipt: {receipt:?} {event_id:?}");
571 if let Err(e) = sender.send(TimelineUpdate::OwnUserReadReceipt(receipt))
572 {
573 eprintln!("Failed to get own user read receipt: {e:?}");
574 }
575 }
576
577 while (update_receiver.next().await).is_some() {
578 if let Some((_, receipt)) =
579 timeline.latest_user_read_receipt(&client_user_id).await
580 {
581 if let Err(e) =
582 sender.send(TimelineUpdate::OwnUserReadReceipt(receipt))
583 {
584 eprintln!("Failed to get own user read receipt: {e:?}");
585 }
586 }
587 }
588 }
589 });
590 tasks_list.insert(room_id.clone(), subscribe_own_read_receipt_task);
591 }
592 // MatrixRequest::SpawnSSOServer {
593 // brand,
594 // homeserver_url,
595 // identity_provider_id,
596 // } => {
597 // spawn_sso_server(
598 // brand,
599 // homeserver_url,
600 // identity_provider_id,
601 // login_sender.clone(),
602 // )
603 // .await;
604 // }
605 MatrixRequest::ResolveRoomAlias(room_alias) => {
606 let Some(client) = CLIENT.get() else { continue };
607 let _resolve_task = Handle::current().spawn(async move {
608 println!("Sending resolve room alias request for {room_alias}...");
609 let res = client.resolve_room_alias(&room_alias).await;
610 println!("Resolved room alias {room_alias} to: {res:?}");
611 todo!("Send the resolved room alias back to the UI thread somehow.");
612 });
613 }
614 // MatrixRequest::FetchAvatar {
615 // mxc_uri,
616 // on_fetched,
617 // } => {
618 // let Some(client) = CLIENT.get() else { continue };
619 // let _fetch_task = Handle::current().spawn(async move {
620 // // println!("Sending fetch avatar request for {mxc_uri:?}...");
621 // let media_request = MediaRequestParameters {
622 // source: MediaSource::Plain(mxc_uri.clone()),
623 // format: AVATAR_THUMBNAIL_FORMAT.into(),
624 // };
625 // let res = client.media().get_media_content(&media_request, true).await;
626 // // println!("Fetched avatar for {mxc_uri:?}, succeeded? {}", res.is_ok());
627 // on_fetched(AvatarUpdate {
628 // mxc_uri,
629 // avatar_data: res.map(|v| v.into()),
630 // });
631 // });
632 // }
633 // MatrixRequest::FetchMedia {
634 // media_request,
635 // on_fetched,
636 // destination,
637 // update_sender,
638 // } => {
639 // let Some(client) = CLIENT.get() else { continue };
640 // let media = client.media();
641
642 // let _fetch_task = Handle::current().spawn(async move {
643 // // println!("Sending fetch media request for {media_request:?}...");
644 // let res = media.get_media_content(&media_request, true).await;
645 // on_fetched(&destination, media_request, res, update_sender);
646 // });
647 // }
648 MatrixRequest::SendMessage {
649 room_id,
650 message,
651 // replied_to,
652 } => {
653 let timeline = {
654 let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
655 let Some(room_info) = all_joined_rooms.get(&room_id) else {
656 println!("BUG: room info not found for send message request {room_id}");
657 continue;
658 };
659 room_info.timeline.clone()
660 };
661
662 // Spawn a new async task that will send the actual message.
663 let _send_message_task = Handle::current().spawn(async move {
664 println!("Sending message to room {room_id}: {message:?}...");
665 // if let Some(replied_to_info) = replied_to {
666 // match timeline
667 // .send_reply(message.into(), replied_to_info, ForwardThread::Yes)
668 // .await
669 // {
670 // Ok(_send_handle) => println!("Sent reply message to room {room_id}."),
671 // Err(_e) => {
672 // eprintln!("Failed to send reply message to room {room_id}: {_e:?}");
673 // enqueue_popup_notification(format!("Failed to send reply: {_e}"));
674 // }
675 // }
676 // } else {
677 match timeline.send(message.into()).await {
678 Ok(_send_handle) => println!("Sent message to room {room_id}."),
679 Err(_e) => {
680 eprintln!("Failed to send message to room {room_id}: {_e:?}");
681 enqueue_popup_notification(format!("Failed to send message: {_e}"));
682 }
683 }
684 // }
685 broadcast_event(UIUpdateMessage::RefreshUI)
686 .expect("Couldn't broadcast event to UI");
687 });
688 }
689
690 MatrixRequest::ReadReceipt { room_id, event_id } => {
691 let timeline = {
692 let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
693 let Some(room_info) = all_joined_rooms.get(&room_id) else {
694 println!("BUG: room info not found when sending read receipt, room {room_id}, {event_id}");
695 continue;
696 };
697 room_info.timeline.clone()
698 };
699 let _send_rr_task = Handle::current().spawn(async move {
700 match timeline.send_single_receipt(ReceiptType::Read, ReceiptThread::Unthreaded, event_id.clone()).await {
701 Ok(sent) => println!("{} read receipt to room {room_id} for event {event_id}", if sent { "Sent" } else { "Already sent" }),
702 Err(_e) => eprintln!("Failed to send read receipt to room {room_id} for event {event_id}; error: {_e:?}"),
703 }
704 // Also update the number of unread messages in the room.
705 enqueue_rooms_list_update(RoomsListUpdate::UpdateNumUnreadMessages {
706 room_id: room_id.clone(),
707 count: UnreadMessageCount::Known(timeline.room().num_unread_messages()),
708 unread_mentions: timeline.room().num_unread_mentions()
709 });
710 });
711 }
712
713 MatrixRequest::FullyReadReceipt {
714 room_id, event_id, ..
715 } => {
716 let timeline = {
717 let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
718 let Some(room_info) = all_joined_rooms.get(&room_id) else {
719 println!("BUG: room info not found when sending fully read receipt, room {room_id}, {event_id}");
720 continue;
721 };
722 room_info.timeline.clone()
723 };
724 let _send_frr_task = Handle::current().spawn(async move {
725 match timeline.send_single_receipt(ReceiptType::FullyRead, ReceiptThread::Unthreaded, event_id.clone()).await {
726 Ok(sent) => println!("{} fully read receipt to room {room_id} for event {event_id}",
727 if sent { "Sent" } else { "Already sent" }
728 ),
729 Err(_e) => eprintln!("Failed to send fully read receipt to room {room_id} for event {event_id}; error: {_e:?}"),
730 }
731 // Also update the number of unread messages in the room.
732 enqueue_rooms_list_update(RoomsListUpdate::UpdateNumUnreadMessages {
733 room_id: room_id.clone(),
734 count: UnreadMessageCount::Known(timeline.room().num_unread_messages()),
735 unread_mentions: timeline.room().num_unread_mentions()
736 });
737 });
738 }
739
740 MatrixRequest::GetRoomPowerLevels { room_id } => {
741 let (timeline, sender) = {
742 let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
743 let Some(room_info) = all_joined_rooms.get(&room_id) else {
744 println!("BUG: room info not found for fetch members request {room_id}");
745 continue;
746 };
747
748 (
749 room_info.timeline.clone(),
750 room_info.timeline_update_sender.clone(),
751 )
752 };
753
754 let Some(user_id) = current_user_id() else {
755 continue;
756 };
757
758 let _power_levels_task = Handle::current().spawn(async move {
759 match timeline.room().power_levels().await {
760 Ok(power_levels) => {
761 println!("Successfully fetched power levels for room {room_id}.");
762 if let Err(e) = sender.send(TimelineUpdate::UserPowerLevels(
763 UserPowerLevels::from(&power_levels, &user_id),
764 )) {
765 eprintln!(
766 "Failed to send the result of if user can send message: {e}"
767 )
768 }
769 broadcast_event(UIUpdateMessage::RefreshUI)
770 .expect("Couldn't broadcast event to UI");
771 }
772 Err(e) => {
773 eprintln!("Failed to fetch power levels for room {room_id}: {e:?}");
774 }
775 }
776 });
777 }
778 MatrixRequest::ToggleReaction {
779 room_id,
780 timeline_event_id,
781 reaction,
782 } => {
783 let timeline = {
784 let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
785 let Some(room_info) = all_joined_rooms.get(&room_id) else {
786 println!("BUG: room info not found for send toggle reaction {room_id}");
787 continue;
788 };
789 room_info.timeline.clone()
790 };
791
792 let _toggle_reaction_task = Handle::current().spawn(async move {
793 println!("Toggle Reaction to room {room_id}: ...");
794 match timeline.toggle_reaction(&timeline_event_id, &reaction).await {
795 Ok(_send_handle) => {
796 broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
797 println!("Sent toggle reaction to room {room_id} {reaction}.")
798 },
799 Err(_e) => eprintln!("Failed to send toggle reaction to room {room_id} {reaction}; error: {_e:?}"),
800 }
801 });
802 }
803 MatrixRequest::RedactMessage {
804 room_id,
805 timeline_event_id,
806 reason,
807 } => {
808 let timeline = {
809 let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
810 let Some(room_info) = all_joined_rooms.get(&room_id) else {
811 println!("BUG: room info not found for redact message {room_id}");
812 continue;
813 };
814 room_info.timeline.clone()
815 };
816
817 let _redact_task = Handle::current().spawn(async move {
818 match timeline.redact(&timeline_event_id, reason.as_deref()).await {
819 Ok(()) => println!("Successfully redacted message in room {room_id}."),
820 Err(e) => {
821 eprintln!("Failed to redact message in {room_id}; error: {e:?}");
822 enqueue_popup_notification(format!(
823 "Failed to redact message. Error: {e}"
824 ));
825 }
826 }
827 });
828 }
829 MatrixRequest::GetMatrixRoomLinkPillInfo { matrix_id, via } => {
830 let Some(client) = CLIENT.get() else { continue };
831 let _fetch_matrix_link_pill_info_task = Handle::current().spawn(async move {
832 let room_or_alias_id: Option<&RoomOrAliasId> = match &matrix_id {
833 MatrixId::Room(room_id) => Some((&**room_id).into()),
834 MatrixId::RoomAlias(room_alias_id) => Some((&**room_alias_id).into()),
835 MatrixId::Event(room_or_alias_id, _event_id) => Some(room_or_alias_id),
836 _ => {
837 println!("MatrixLinkRoomPillInfoRequest: Unsupported MatrixId type: {matrix_id:?}");
838 return;
839 }
840 };
841 if let Some(room_or_alias_id) = room_or_alias_id {
842 match client.get_room_preview(room_or_alias_id, via).await {
843 Ok(_preview) => {},
844 // Cx::post_action(MatrixLinkPillState::Loaded {
845 // matrix_id: matrix_id.clone(),
846 // name: preview.name.unwrap_or_else(|| room_or_alias_id.to_string()),
847 // avatar_url: preview.avatar_url
848 // }),
849 Err(_e) => {
850 println!("Failed to get room link pill info for {room_or_alias_id:?}: {_e:?}");
851 }
852 };
853 }
854 });
855 }
856 _ => bail!("Not implemented yet"),
857 }
858 }
859
860 eprintln!("async_worker task ended unexpectedly");
861 bail!("async_worker task ended unexpectedly")
862}
863
864/// Worker that loops to update rooms_list updates in queue
865/// currently it handles active_room updates outside the other actions,
866/// but maybe I should handle this as every other action
867pub async fn ui_worker<R: Runtime>(app_handle: AppHandle<R>) -> anyhow::Result<()> {
868 let rooms_list = Arc::new(Mutex::new(RoomsList::new()));
869 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
870
871 // create UI subscriber
872 let mut ui_subscriber = debounce_broadcast(
873 super::singletons::subscribe_to_events().expect("Couldn't get UI subscriber event"),
874 Duration::from_millis(200),
875 );
876
877 // Event listener sends to channel instead of directly accessing rooms_list
878 let listener_tx = tx.clone();
879 app_handle.listen(
880 MatrixSvelteListenEvent::MatrixUpdateCurrentActiveRoom.as_str(),
881 move |event| {
882 println!(
883 "Received event to change current active room ! Body: {:?}",
884 event.payload()
885 );
886 let tx_clone = listener_tx.clone();
887 tauri::async_runtime::spawn(async move {
888 if let Ok(payload) =
889 serde_json::from_str::<MatrixUpdateCurrentActiveRoom>(&event.payload())
890 {
891 println!("payload: {payload:?}");
892 let (room_id, room_name) = if let Some(id) = payload.room_id {
893 let test = Some(OwnedRoomId::try_from(id).unwrap());
894 println!("test: {test:?}");
895 (test, payload.room_name)
896 } else {
897 (None, None)
898 };
899 println!("room id: {room_id:?}");
900 println!("room_name: {room_name:?}");
901 tx_clone.send((room_id, room_name)).unwrap();
902 }
903 });
904 },
905 );
906
907 loop {
908 tokio::select! {
909 // Handle incoming events from listener
910 Some((room_id, room_name)) = rx.recv() => {
911 let mut lock = rooms_list.lock().await;
912 lock.handle_current_active_room(&app_handle, room_id, room_name)
913 .expect("Couldn't set the room screen");
914 }
915
916 // Listen to UI refresh events
917 _ = ui_subscriber.recv() => {
918 let mut lock = rooms_list.lock().await;
919 lock.handle_rooms_list_updates(&app_handle).await;
920 }
921 }
922 }
923}