tauri_plugin_matrix_svelte/matrix/
workers.rs

1use std::{collections::BTreeMap, sync::Arc, time::Duration};
2
3use anyhow::bail;
4use futures::{pin_mut, StreamExt};
5use matrix_sdk::{
6    ruma::{
7        api::client::receipt::create_receipt::v3::ReceiptType, events::receipt::ReceiptThread,
8        matrix_uri::MatrixId, OwnedRoomId, RoomOrAliasId,
9    },
10    Client,
11};
12use tauri::{AppHandle, Listener, Manager, Runtime};
13use tokio::{
14    runtime::Handle,
15    sync::{mpsc::UnboundedReceiver, Mutex},
16    task::JoinHandle,
17};
18
19use crate::{
20    matrix::{
21        notifications::enqueue_popup_notification,
22        requests::submit_async_request,
23        room::rooms_list::{enqueue_rooms_list_update, RoomsCollectionStatus, RoomsListUpdate},
24        rooms::UnreadMessageCount,
25        singletons::{broadcast_event, UIUpdateMessage, ALL_JOINED_ROOMS, CLIENT},
26        sync::sync,
27        timeline::{PaginationDirection, TimelineUpdate},
28        user_power_level::UserPowerLevels,
29        user_profile::{enqueue_user_profile_update, UserProfileUpdate},
30        utils::current_user_id,
31    },
32    models::matrix::{MatrixSvelteListenEvent, MatrixUpdateCurrentActiveRoom},
33};
34
35use super::{requests::MatrixRequest, room::rooms_list::RoomsList, utils::debounce_broadcast};
36
37/// The main loop that actually uses a Matrix client
38pub async fn async_main_loop<R: Runtime>(
39    app_handle: AppHandle<R>,
40    client: Client,
41) -> anyhow::Result<()> {
42    let logged_in_user_id = client
43        .user_id()
44        .expect("BUG: client.user_id() returned None after successful login!");
45    let status = RoomsCollectionStatus::Loading(format!(
46        "Logged in as {}.\n → Loading rooms...",
47        logged_in_user_id
48    ));
49    // enqueue_popup_notification(status.clone());
50    enqueue_rooms_list_update(RoomsListUpdate::Status { status });
51
52    // Listen for updates to the ignored user list.
53    // handle_ignore_user_list_subscriber(client.clone());
54    let rooms_list_handle = app_handle.app_handle().clone();
55    tauri::async_runtime::spawn(ui_worker(rooms_list_handle));
56
57    // call sync
58    sync(&app_handle, client).await?;
59
60    bail!("room list service sync loop ended unexpectedly")
61}
62
63/// The entry point for an async worker thread that can run async tasks.
64///
65/// All this thread does is wait for [`MatrixRequests`] from the main UI-driven non-async thread(s)
66/// and then executes them within an async runtime context.
67pub async fn async_worker(
68    mut request_receiver: UnboundedReceiver<MatrixRequest>,
69) -> anyhow::Result<()> {
70    println!("Started async_worker task.");
71    let mut tasks_list: BTreeMap<OwnedRoomId, JoinHandle<()>> = BTreeMap::new();
72    while let Some(request) = request_receiver.recv().await {
73        match request {
74            MatrixRequest::PaginateRoomTimeline {
75                room_id,
76                num_events,
77                direction,
78            } => {
79                let (timeline, sender) = {
80                    let mut all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
81                    let Some(room_info) = all_joined_rooms.get_mut(&room_id) else {
82                        println!("Skipping pagination request for not-yet-known room {room_id}");
83                        continue;
84                    };
85
86                    let timeline_ref = room_info.timeline.clone();
87                    let sender = room_info.timeline_update_sender.clone();
88                    (timeline_ref, sender)
89                };
90
91                // Spawn a new async task that will make the actual pagination request.
92                let _paginate_task = Handle::current().spawn(async move {
93                    println!("Starting {direction} pagination request for room {room_id}...");
94                    sender.send(TimelineUpdate::PaginationRunning(direction)).unwrap();
95                    broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
96
97                    let res = if direction == PaginationDirection::Forwards {
98                        timeline.paginate_forwards(num_events).await
99                    } else {
100                        timeline.paginate_backwards(num_events).await
101                    };
102
103                    match res {
104                        Ok(fully_paginated) => {
105                            println!("Completed {direction} pagination request for room {room_id}, hit {} of timeline? {}",
106                                if direction == PaginationDirection::Forwards { "end" } else { "start" },
107                                if fully_paginated { "yes" } else { "no" },
108                            );
109                            sender.send(TimelineUpdate::PaginationIdle {
110                                fully_paginated,
111                                direction,
112                            }).unwrap();
113                            broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
114                        }
115                        Err(error) => {
116                            eprintln!("Error sending {direction} pagination request for room {room_id}: {error:?}");
117                            sender.send(TimelineUpdate::PaginationError {
118                                error,
119                                direction,
120                            }).unwrap();
121                            broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
122                        }
123                    }
124                });
125            }
126
127            MatrixRequest::EditMessage {
128                room_id,
129                timeline_event_item_id: timeline_event_id,
130                edited_content,
131            } => {
132                let (timeline, sender) = {
133                    let mut all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
134                    let Some(room_info) = all_joined_rooms.get_mut(&room_id) else {
135                        eprintln!("BUG: room info not found for edit request, room {room_id}");
136                        continue;
137                    };
138                    (
139                        room_info.timeline.clone(),
140                        room_info.timeline_update_sender.clone(),
141                    )
142                };
143
144                // Spawn a new async task that will make the actual edit request.
145                let _edit_task = Handle::current().spawn(async move {
146                    println!("Sending request to edit message {timeline_event_id:?} in room {room_id}...");
147                    let result = timeline.edit(&timeline_event_id, edited_content).await;
148                    match result {
149                        Ok(_) => println!("Successfully edited message {timeline_event_id:?} in room {room_id}."),
150                        Err(ref e) => eprintln!("Error editing message {timeline_event_id:?} in room {room_id}: {e:?}"),
151                    }
152                    sender.send(TimelineUpdate::MessageEdited {
153                        timeline_event_id,
154                        result,
155                    }).unwrap();
156                    broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
157                });
158            }
159
160            MatrixRequest::FetchDetailsForEvent { room_id, event_id } => {
161                let (timeline, sender) = {
162                    let mut all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
163                    let Some(room_info) = all_joined_rooms.get_mut(&room_id) else {
164                        eprintln!("BUG: room info not found for fetch details for event request {room_id}");
165                        continue;
166                    };
167
168                    (
169                        room_info.timeline.clone(),
170                        room_info.timeline_update_sender.clone(),
171                    )
172                };
173
174                // Spawn a new async task that will make the actual fetch request.
175                let _fetch_task = Handle::current().spawn(async move {
176                    // println!("Sending request to fetch details for event {event_id} in room {room_id}...");
177                    let result = timeline.fetch_details_for_event(&event_id).await;
178                    match result {
179                        Ok(_) => {
180                            // println!("Successfully fetched details for event {event_id} in room {room_id}.");
181                        }
182                        Err(ref _e) => {
183                            // eprintln!("Error fetching details for event {event_id} in room {room_id}: {e:?}");
184                        }
185                    }
186                    sender
187                        .send(TimelineUpdate::EventDetailsFetched { event_id, result })
188                        .unwrap();
189                    broadcast_event(UIUpdateMessage::RefreshUI)
190                        .expect("Couldn't broadcast event to UI");
191                });
192            }
193
194            MatrixRequest::SyncRoomMemberList { room_id } => {
195                let (timeline, sender) = {
196                    let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
197                    let Some(room_info) = all_joined_rooms.get(&room_id) else {
198                        eprintln!("BUG: room info not found for fetch members request {room_id}");
199                        continue;
200                    };
201
202                    (
203                        room_info.timeline.clone(),
204                        room_info.timeline_update_sender.clone(),
205                    )
206                };
207
208                // Spawn a new async task that will make the actual fetch request.
209                let _fetch_task = Handle::current().spawn(async move {
210                    println!("Sending sync room members request for room {room_id}...");
211                    timeline.fetch_members().await;
212                    println!("Completed sync room members request for room {room_id}.");
213                    sender.send(TimelineUpdate::RoomMembersSynced).unwrap();
214                    broadcast_event(UIUpdateMessage::RefreshUI)
215                        .expect("Couldn't broadcast event to UI");
216                });
217            }
218
219            // MatrixRequest::JoinRoom { room_id } => {
220            //     let Some(client) = CLIENT.get() else { continue };
221            //     let _join_room_task = Handle::current().spawn(async move {
222            //         println!("Sending request to join room {room_id}...");
223            //         let result_action = if let Some(room) = client.get_room(&room_id) {
224            //             match room.join().await {
225            //                 Ok(()) => {
226            //                     println!("Successfully joined room {room_id}.");
227            //                     JoinRoomAction::Joined { room_id }
228            //                 }
229            //                 Err(e) => {
230            //                     eprintln!("Error joining room {room_id}: {e:?}");
231            //                     JoinRoomAction::Failed { room_id, error: e }
232            //                 }
233            //             }
234            //         } else {
235            //             eprintln!("BUG: client could not get room with ID {room_id}");
236            //             JoinRoomAction::Failed {
237            //                 room_id,
238            //                 error: matrix_sdk::Error::UnknownError(
239            //                     String::from("Client couldn't locate room to join it.").into(),
240            //                 ),
241            //             }
242            //         };
243            //         Cx::post_action(result_action);
244            //     });
245            // }
246            // MatrixRequest::LeaveRoom { room_id } => {
247            //     let Some(client) = CLIENT.get() else { continue };
248            //     let _leave_room_task = Handle::current().spawn(async move {
249            //         println!("Sending request to leave room {room_id}...");
250            //         let result_action = if let Some(room) = client.get_room(&room_id) {
251            //             match room.leave().await {
252            //                 Ok(()) => {
253            //                     println!("Successfully left room {room_id}.");
254            //                     LeaveRoomAction::Left { room_id }
255            //                 }
256            //                 Err(e) => {
257            //                     eprintln!("Error leaving room {room_id}: {e:?}");
258            //                     LeaveRoomAction::Failed {
259            //                         room_id,
260            //                         error: e.to_string(),
261            //                     }
262            //                 }
263            //             }
264            //         } else {
265            //             eprintln!("BUG: client could not get room with ID {room_id}");
266            //             LeaveRoomAction::Failed {
267            //                 room_id,
268            //                 error: String::from("Client couldn't locate room to leave it."),
269            //             }
270            //         };
271            //         Cx::post_action(result_action);
272            //     });
273            // }
274            MatrixRequest::GetRoomMembers {
275                room_id,
276                memberships,
277                local_only,
278            } => {
279                let (timeline, sender) = {
280                    let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
281                    let Some(room_info) = all_joined_rooms.get(&room_id) else {
282                        println!("BUG: room info not found for get room members request {room_id}");
283                        continue;
284                    };
285                    (
286                        room_info.timeline.clone(),
287                        room_info.timeline_update_sender.clone(),
288                    )
289                };
290
291                let _get_members_task = Handle::current().spawn(async move {
292                    let room = timeline.room();
293
294                    if local_only {
295                        if let Ok(members) = room.members_no_sync(memberships).await {
296                            println!(
297                                "Got {} members from cache for room {}",
298                                members.len(),
299                                room_id
300                            );
301                            sender
302                                .send(TimelineUpdate::RoomMembersListFetched { members })
303                                .unwrap();
304                        }
305                    } else {
306                        if let Ok(members) = room.members(memberships).await {
307                            println!(
308                                "Successfully fetched {} members from server for room {}",
309                                members.len(),
310                                room_id
311                            );
312                            sender
313                                .send(TimelineUpdate::RoomMembersListFetched { members })
314                                .unwrap();
315                        }
316                    }
317
318                    broadcast_event(UIUpdateMessage::RefreshUI)
319                        .expect("Couldn't broadcast event to UI");
320                });
321            }
322
323            // MatrixRequest::GetUserProfile {
324            //     user_id,
325            //     room_id,
326            //     local_only,
327            // } => {
328            //     let Some(client) = CLIENT.get() else { continue };
329            //     let _fetch_task = Handle::current().spawn(async move {
330            //         // println!("Sending get user profile request: user: {user_id}, \
331            //         //     room: {room_id:?}, local_only: {local_only}...",
332            //         // );
333
334            //         let mut update = None;
335
336            //         if let Some(room_id) = room_id.as_ref() {
337            //             if let Some(room) = client.get_room(room_id) {
338            //                 let member = if local_only {
339            //                     room.get_member_no_sync(&user_id).await
340            //                 } else {
341            //                     room.get_member(&user_id).await
342            //                 };
343            //                 if let Ok(Some(room_member)) = member {
344            //                     update = Some(UserProfileUpdate::Full {
345            //                         new_profile: UserProfile {
346            //                             username: room_member.display_name().map(|u| u.to_owned()),
347            //                             user_id: user_id.clone(),
348            //                             avatar_state: AvatarState::Known(room_member.avatar_url().map(|u| u.to_owned())),
349            //                         },
350            //                         room_id: room_id.to_owned(),
351            //                         room_member,
352            //                     });
353            //                 } else {
354            //                     println!("User profile request: user {user_id} was not a member of room {room_id}");
355            //                 }
356            //             } else {
357            //                 println!("User profile request: client could not get room with ID {room_id}");
358            //             }
359            //         }
360
361            //         if !local_only {
362            //             if update.is_none() {
363            //                 if let Ok(response) = client.account().fetch_user_profile_of(&user_id).await {
364            //                     update = Some(UserProfileUpdate::UserProfileOnly(
365            //                         UserProfile {
366            //                             username: response.displayname,
367            //                             user_id: user_id.clone(),
368            //                             avatar_state: AvatarState::Known(response.avatar_url),
369            //                         }
370            //                     ));
371            //                 } else {
372            //                     println!("User profile request: client could not get user with ID {user_id}");
373            //                 }
374            //             }
375
376            //             match update.as_mut() {
377            //                 Some(UserProfileUpdate::Full { new_profile: UserProfile { username, .. }, .. }) if username.is_none() => {
378            //                     if let Ok(response) = client.account().fetch_user_profile_of(&user_id).await {
379            //                         *username = response.displayname;
380            //                     }
381            //                 }
382            //                 _ => { }
383            //             }
384            //         }
385
386            //         if let Some(upd) = update {
387            //             // println!("Successfully completed get user profile request: user: {user_id}, room: {room_id:?}, local_only: {local_only}.");
388            //             enqueue_user_profile_update(upd);
389            //         } else {
390            //             println!("Failed to get user profile: user: {user_id}, room: {room_id:?}, local_only: {local_only}.");
391            //         }
392            //     });
393            // }
394            MatrixRequest::GetNumberUnreadMessages { room_id } => {
395                let (timeline, sender) = {
396                    let mut all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
397                    let Some(room_info) = all_joined_rooms.get_mut(&room_id) else {
398                        println!("Skipping get number of unread messages request for not-yet-known room {room_id}");
399                        continue;
400                    };
401
402                    (
403                        room_info.timeline.clone(),
404                        room_info.timeline_update_sender.clone(),
405                    )
406                };
407                let _get_unreads_task = Handle::current().spawn(async move {
408                    match sender.send(TimelineUpdate::NewUnreadMessagesCount(
409                        UnreadMessageCount::Known(timeline.room().num_unread_messages())
410                    )) {
411                        Ok(_) => {
412                            broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
413                        },
414                        Err(e) => println!("Failed to send timeline update: {e:?} for GetNumberUnreadMessages request for room {room_id}"),
415                    }
416                    enqueue_rooms_list_update(RoomsListUpdate::UpdateNumUnreadMessages {
417                        room_id: room_id.clone(),
418                        count: UnreadMessageCount::Known(timeline.room().num_unread_messages()),
419                        unread_mentions:timeline.room().num_unread_mentions(),
420                    });
421                });
422            }
423            MatrixRequest::IgnoreUser {
424                ignore,
425                room_member,
426                room_id,
427            } => {
428                let Some(client) = CLIENT.get() else { continue };
429                let _ignore_task = Handle::current().spawn(async move {
430                    let user_id = room_member.user_id();
431                    println!("Sending request to {}ignore user: {user_id}...", if ignore { "" } else { "un" });
432                    let ignore_result = if ignore {
433                        room_member.ignore().await
434                    } else {
435                        room_member.unignore().await
436                    };
437
438                    println!("{} user {user_id} {}",
439                        if ignore { "Ignoring" } else { "Unignoring" },
440                        if ignore_result.is_ok() { "succeeded." } else { "failed." },
441                    );
442
443                    if ignore_result.is_err() {
444                        return;
445                    }
446
447                    // We need to re-acquire the `RoomMember` object now that its state
448                    // has changed, i.e., the user has been (un)ignored.
449                    // We then need to send an update to replace the cached `RoomMember`
450                    // with the now-stale ignored state.
451                    if let Some(room) = client.get_room(&room_id) {
452                        if let Ok(Some(new_room_member)) = room.get_member(user_id).await {
453                            println!("Enqueueing user profile update for user {user_id}, who went from {}ignored to {}ignored.",
454                                if room_member.is_ignored() { "" } else { "un" },
455                                if new_room_member.is_ignored() { "" } else { "un" },
456                            );
457                            enqueue_user_profile_update(UserProfileUpdate::RoomMemberOnly {
458                                room_id: room_id.clone(),
459                                room_member: new_room_member,
460                            });
461                        }
462                    }
463
464                    // After successfully (un)ignoring a user, all timelines are fully cleared by the Matrix SDK.
465                    // Therefore, we need to re-fetch all timelines for all rooms,
466                    // and currently the only way to actually accomplish this is via pagination.
467                    // See: <https://github.com/matrix-org/matrix-rust-sdk/issues/1703#issuecomment-2250297923>
468                    //
469                    // Note that here we only proactively re-paginate the *current* room
470                    // (the one being viewed by the user when this ignore request was issued),
471                    // and all other rooms will be re-paginated in `handle_ignore_user_list_subscriber()`.`
472                    submit_async_request(MatrixRequest::PaginateRoomTimeline {
473                        room_id,
474                        num_events: 50,
475                        direction: PaginationDirection::Backwards,
476                    });
477                });
478            }
479
480            MatrixRequest::SendTypingNotice { room_id, typing } => {
481                let Some(room) = CLIENT.get().and_then(|c| c.get_room(&room_id)) else {
482                    eprintln!("BUG: client/room not found for typing notice request {room_id}");
483                    continue;
484                };
485                let _typing_task = Handle::current().spawn(async move {
486                    if let Err(e) = room.typing_notice(typing).await {
487                        eprintln!("Failed to send typing notice to room {room_id}: {e:?}");
488                    }
489                });
490            }
491
492            MatrixRequest::SubscribeToTypingNotices { room_id, subscribe } => {
493                let (room, timeline_update_sender, mut typing_notice_receiver) = {
494                    let mut all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
495                    let Some(room_info) = all_joined_rooms.get_mut(&room_id) else {
496                        println!("BUG: room info not found for subscribe to typing notices request, room {room_id}");
497                        continue;
498                    };
499                    let (room, recv) = if subscribe {
500                        if room_info.typing_notice_subscriber.is_some() {
501                            println!(
502                                "Note: room {room_id} is already subscribed to typing notices."
503                            );
504                            continue;
505                        } else {
506                            let Some(room) = CLIENT.get().and_then(|c| c.get_room(&room_id)) else {
507                                eprintln!("BUG: client/room not found when subscribing to typing notices request, room: {room_id}");
508                                continue;
509                            };
510                            let (drop_guard, recv) = room.subscribe_to_typing_notifications();
511                            room_info.typing_notice_subscriber = Some(drop_guard);
512                            (room, recv)
513                        }
514                    } else {
515                        room_info.typing_notice_subscriber.take();
516                        continue;
517                    };
518                    // Here: we don't have an existing subscriber running, so we fall through and start one.
519                    (room, room_info.timeline_update_sender.clone(), recv)
520                };
521
522                let _typing_notices_task = Handle::current().spawn(async move {
523                    while let Ok(user_ids) = typing_notice_receiver.recv().await {
524                        // println!("Received typing notifications for room {room_id}: {user_ids:?}");
525                        let mut users = Vec::with_capacity(user_ids.len());
526                        for user_id in user_ids {
527                            users.push(
528                                room.get_member_no_sync(&user_id)
529                                    .await
530                                    .ok()
531                                    .flatten()
532                                    .and_then(|m| m.display_name().map(|d| d.to_owned()))
533                                    .unwrap_or_else(|| user_id.to_string())
534                            );
535                        }
536                        if let Err(e) = timeline_update_sender.send(TimelineUpdate::TypingUsers { users }) {
537                            eprintln!("Error: timeline update sender couldn't send the list of typing users: {e:?}");
538                        }
539                        broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
540                    }
541                    // println!("Note: typing notifications recv loop has ended for room {}", room_id);
542                });
543            }
544            MatrixRequest::SubscribeToOwnUserReadReceiptsChanged { room_id, subscribe } => {
545                if !subscribe {
546                    if let Some(task_handler) = tasks_list.remove(&room_id) {
547                        task_handler.abort();
548                    }
549                    continue;
550                }
551                let (timeline, sender) = {
552                    let mut all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
553                    let Some(room_info) = all_joined_rooms.get_mut(&room_id) else {
554                        println!("BUG: room info not found for subscribe to own user read receipts changed request, room {room_id}");
555                        continue;
556                    };
557                    (
558                        room_info.timeline.clone(),
559                        room_info.timeline_update_sender.clone(),
560                    )
561                };
562
563                let subscribe_own_read_receipt_task = Handle::current().spawn(async move {
564                    let update_receiver = timeline.subscribe_own_user_read_receipts_changed().await;
565                    pin_mut!(update_receiver);
566                    if let Some(client_user_id) = current_user_id() {
567                        if let Some((event_id, receipt)) =
568                            timeline.latest_user_read_receipt(&client_user_id).await
569                        {
570                            println!("Received own user read receipt: {receipt:?} {event_id:?}");
571                            if let Err(e) = sender.send(TimelineUpdate::OwnUserReadReceipt(receipt))
572                            {
573                                eprintln!("Failed to get own user read receipt: {e:?}");
574                            }
575                        }
576
577                        while (update_receiver.next().await).is_some() {
578                            if let Some((_, receipt)) =
579                                timeline.latest_user_read_receipt(&client_user_id).await
580                            {
581                                if let Err(e) =
582                                    sender.send(TimelineUpdate::OwnUserReadReceipt(receipt))
583                                {
584                                    eprintln!("Failed to get own user read receipt: {e:?}");
585                                }
586                            }
587                        }
588                    }
589                });
590                tasks_list.insert(room_id.clone(), subscribe_own_read_receipt_task);
591            }
592            // MatrixRequest::SpawnSSOServer {
593            //     brand,
594            //     homeserver_url,
595            //     identity_provider_id,
596            // } => {
597            //     spawn_sso_server(
598            //         brand,
599            //         homeserver_url,
600            //         identity_provider_id,
601            //         login_sender.clone(),
602            //     )
603            //     .await;
604            // }
605            MatrixRequest::ResolveRoomAlias(room_alias) => {
606                let Some(client) = CLIENT.get() else { continue };
607                let _resolve_task = Handle::current().spawn(async move {
608                    println!("Sending resolve room alias request for {room_alias}...");
609                    let res = client.resolve_room_alias(&room_alias).await;
610                    println!("Resolved room alias {room_alias} to: {res:?}");
611                    todo!("Send the resolved room alias back to the UI thread somehow.");
612                });
613            }
614            // MatrixRequest::FetchAvatar {
615            //     mxc_uri,
616            //     on_fetched,
617            // } => {
618            //     let Some(client) = CLIENT.get() else { continue };
619            //     let _fetch_task = Handle::current().spawn(async move {
620            //         // println!("Sending fetch avatar request for {mxc_uri:?}...");
621            //         let media_request = MediaRequestParameters {
622            //             source: MediaSource::Plain(mxc_uri.clone()),
623            //             format: AVATAR_THUMBNAIL_FORMAT.into(),
624            //         };
625            //         let res = client.media().get_media_content(&media_request, true).await;
626            //         // println!("Fetched avatar for {mxc_uri:?}, succeeded? {}", res.is_ok());
627            //         on_fetched(AvatarUpdate {
628            //             mxc_uri,
629            //             avatar_data: res.map(|v| v.into()),
630            //         });
631            //     });
632            // }
633            // MatrixRequest::FetchMedia {
634            //     media_request,
635            //     on_fetched,
636            //     destination,
637            //     update_sender,
638            // } => {
639            //     let Some(client) = CLIENT.get() else { continue };
640            //     let media = client.media();
641
642            //     let _fetch_task = Handle::current().spawn(async move {
643            //         // println!("Sending fetch media request for {media_request:?}...");
644            //         let res = media.get_media_content(&media_request, true).await;
645            //         on_fetched(&destination, media_request, res, update_sender);
646            //     });
647            // }
648            MatrixRequest::SendMessage {
649                room_id,
650                message,
651                // replied_to,
652            } => {
653                let timeline = {
654                    let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
655                    let Some(room_info) = all_joined_rooms.get(&room_id) else {
656                        println!("BUG: room info not found for send message request {room_id}");
657                        continue;
658                    };
659                    room_info.timeline.clone()
660                };
661
662                // Spawn a new async task that will send the actual message.
663                let _send_message_task = Handle::current().spawn(async move {
664                    println!("Sending message to room {room_id}: {message:?}...");
665                    // if let Some(replied_to_info) = replied_to {
666                    //     match timeline
667                    //         .send_reply(message.into(), replied_to_info, ForwardThread::Yes)
668                    //         .await
669                    //     {
670                    //         Ok(_send_handle) => println!("Sent reply message to room {room_id}."),
671                    //         Err(_e) => {
672                    //             eprintln!("Failed to send reply message to room {room_id}: {_e:?}");
673                    //             enqueue_popup_notification(format!("Failed to send reply: {_e}"));
674                    //         }
675                    //     }
676                    // } else {
677                    match timeline.send(message.into()).await {
678                        Ok(_send_handle) => println!("Sent message to room {room_id}."),
679                        Err(_e) => {
680                            eprintln!("Failed to send message to room {room_id}: {_e:?}");
681                            enqueue_popup_notification(format!("Failed to send message: {_e}"));
682                        }
683                    }
684                    // }
685                    broadcast_event(UIUpdateMessage::RefreshUI)
686                        .expect("Couldn't broadcast event to UI");
687                });
688            }
689
690            MatrixRequest::ReadReceipt { room_id, event_id } => {
691                let timeline = {
692                    let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
693                    let Some(room_info) = all_joined_rooms.get(&room_id) else {
694                        println!("BUG: room info not found when sending read receipt, room {room_id}, {event_id}");
695                        continue;
696                    };
697                    room_info.timeline.clone()
698                };
699                let _send_rr_task = Handle::current().spawn(async move {
700                    match timeline.send_single_receipt(ReceiptType::Read, ReceiptThread::Unthreaded, event_id.clone()).await {
701                        Ok(sent) => println!("{} read receipt to room {room_id} for event {event_id}", if sent { "Sent" } else { "Already sent" }),
702                        Err(_e) => eprintln!("Failed to send read receipt to room {room_id} for event {event_id}; error: {_e:?}"),
703                    }
704                    // Also update the number of unread messages in the room.
705                    enqueue_rooms_list_update(RoomsListUpdate::UpdateNumUnreadMessages {
706                        room_id: room_id.clone(),
707                        count: UnreadMessageCount::Known(timeline.room().num_unread_messages()),
708                        unread_mentions: timeline.room().num_unread_mentions()
709                    });
710                });
711            }
712
713            MatrixRequest::FullyReadReceipt {
714                room_id, event_id, ..
715            } => {
716                let timeline = {
717                    let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
718                    let Some(room_info) = all_joined_rooms.get(&room_id) else {
719                        println!("BUG: room info not found when sending fully read receipt, room {room_id}, {event_id}");
720                        continue;
721                    };
722                    room_info.timeline.clone()
723                };
724                let _send_frr_task = Handle::current().spawn(async move {
725                    match timeline.send_single_receipt(ReceiptType::FullyRead, ReceiptThread::Unthreaded, event_id.clone()).await {
726                        Ok(sent) => println!("{} fully read receipt to room {room_id} for event {event_id}",
727                            if sent { "Sent" } else { "Already sent" }
728                        ),
729                        Err(_e) => eprintln!("Failed to send fully read receipt to room {room_id} for event {event_id}; error: {_e:?}"),
730                    }
731                    // Also update the number of unread messages in the room.
732                    enqueue_rooms_list_update(RoomsListUpdate::UpdateNumUnreadMessages {
733                        room_id: room_id.clone(),
734                        count: UnreadMessageCount::Known(timeline.room().num_unread_messages()),
735                        unread_mentions: timeline.room().num_unread_mentions()
736                    });
737                });
738            }
739
740            MatrixRequest::GetRoomPowerLevels { room_id } => {
741                let (timeline, sender) = {
742                    let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
743                    let Some(room_info) = all_joined_rooms.get(&room_id) else {
744                        println!("BUG: room info not found for fetch members request {room_id}");
745                        continue;
746                    };
747
748                    (
749                        room_info.timeline.clone(),
750                        room_info.timeline_update_sender.clone(),
751                    )
752                };
753
754                let Some(user_id) = current_user_id() else {
755                    continue;
756                };
757
758                let _power_levels_task = Handle::current().spawn(async move {
759                    match timeline.room().power_levels().await {
760                        Ok(power_levels) => {
761                            println!("Successfully fetched power levels for room {room_id}.");
762                            if let Err(e) = sender.send(TimelineUpdate::UserPowerLevels(
763                                UserPowerLevels::from(&power_levels, &user_id),
764                            )) {
765                                eprintln!(
766                                    "Failed to send the result of if user can send message: {e}"
767                                )
768                            }
769                            broadcast_event(UIUpdateMessage::RefreshUI)
770                                .expect("Couldn't broadcast event to UI");
771                        }
772                        Err(e) => {
773                            eprintln!("Failed to fetch power levels for room {room_id}: {e:?}");
774                        }
775                    }
776                });
777            }
778            MatrixRequest::ToggleReaction {
779                room_id,
780                timeline_event_id,
781                reaction,
782            } => {
783                let timeline = {
784                    let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
785                    let Some(room_info) = all_joined_rooms.get(&room_id) else {
786                        println!("BUG: room info not found for send toggle reaction {room_id}");
787                        continue;
788                    };
789                    room_info.timeline.clone()
790                };
791
792                let _toggle_reaction_task = Handle::current().spawn(async move {
793                    println!("Toggle Reaction to room {room_id}: ...");
794                    match timeline.toggle_reaction(&timeline_event_id, &reaction).await {
795                        Ok(_send_handle) => {
796                            broadcast_event(UIUpdateMessage::RefreshUI).expect("Couldn't broadcast event to UI");
797                            println!("Sent toggle reaction to room {room_id} {reaction}.")
798                        },
799                        Err(_e) => eprintln!("Failed to send toggle reaction to room {room_id} {reaction}; error: {_e:?}"),
800                    }
801                });
802            }
803            MatrixRequest::RedactMessage {
804                room_id,
805                timeline_event_id,
806                reason,
807            } => {
808                let timeline = {
809                    let all_joined_rooms = ALL_JOINED_ROOMS.lock().unwrap();
810                    let Some(room_info) = all_joined_rooms.get(&room_id) else {
811                        println!("BUG: room info not found for redact message {room_id}");
812                        continue;
813                    };
814                    room_info.timeline.clone()
815                };
816
817                let _redact_task = Handle::current().spawn(async move {
818                    match timeline.redact(&timeline_event_id, reason.as_deref()).await {
819                        Ok(()) => println!("Successfully redacted message in room {room_id}."),
820                        Err(e) => {
821                            eprintln!("Failed to redact message in {room_id}; error: {e:?}");
822                            enqueue_popup_notification(format!(
823                                "Failed to redact message. Error: {e}"
824                            ));
825                        }
826                    }
827                });
828            }
829            MatrixRequest::GetMatrixRoomLinkPillInfo { matrix_id, via } => {
830                let Some(client) = CLIENT.get() else { continue };
831                let _fetch_matrix_link_pill_info_task = Handle::current().spawn(async move {
832                    let room_or_alias_id: Option<&RoomOrAliasId> = match &matrix_id {
833                        MatrixId::Room(room_id) => Some((&**room_id).into()),
834                        MatrixId::RoomAlias(room_alias_id) => Some((&**room_alias_id).into()),
835                        MatrixId::Event(room_or_alias_id, _event_id) => Some(room_or_alias_id),
836                        _ => {
837                            println!("MatrixLinkRoomPillInfoRequest: Unsupported MatrixId type: {matrix_id:?}");
838                            return;
839                        }
840                    };
841                    if let Some(room_or_alias_id) = room_or_alias_id {
842                        match client.get_room_preview(room_or_alias_id, via).await {
843                            Ok(_preview) => {},
844                            // Cx::post_action(MatrixLinkPillState::Loaded {
845                            //     matrix_id: matrix_id.clone(),
846                            //     name: preview.name.unwrap_or_else(|| room_or_alias_id.to_string()),
847                            //     avatar_url: preview.avatar_url
848                            // }),
849                            Err(_e) => {
850                                println!("Failed to get room link pill info for {room_or_alias_id:?}: {_e:?}");
851                            }
852                        };
853                    }
854                });
855            }
856            _ => bail!("Not implemented yet"),
857        }
858    }
859
860    eprintln!("async_worker task ended unexpectedly");
861    bail!("async_worker task ended unexpectedly")
862}
863
864/// Worker that loops to update rooms_list updates in queue
865/// currently it handles active_room updates outside the other actions,
866/// but maybe I should handle this as every other action
867pub async fn ui_worker<R: Runtime>(app_handle: AppHandle<R>) -> anyhow::Result<()> {
868    let rooms_list = Arc::new(Mutex::new(RoomsList::new()));
869    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
870
871    // create UI subscriber
872    let mut ui_subscriber = debounce_broadcast(
873        super::singletons::subscribe_to_events().expect("Couldn't get UI subscriber event"),
874        Duration::from_millis(200),
875    );
876
877    // Event listener sends to channel instead of directly accessing rooms_list
878    let listener_tx = tx.clone();
879    app_handle.listen(
880        MatrixSvelteListenEvent::MatrixUpdateCurrentActiveRoom.as_str(),
881        move |event| {
882            println!(
883                "Received event to change current active room ! Body: {:?}",
884                event.payload()
885            );
886            let tx_clone = listener_tx.clone();
887            tauri::async_runtime::spawn(async move {
888                if let Ok(payload) =
889                    serde_json::from_str::<MatrixUpdateCurrentActiveRoom>(&event.payload())
890                {
891                    println!("payload: {payload:?}");
892                    let (room_id, room_name) = if let Some(id) = payload.room_id {
893                        let test = Some(OwnedRoomId::try_from(id).unwrap());
894                        println!("test: {test:?}");
895                        (test, payload.room_name)
896                    } else {
897                        (None, None)
898                    };
899                    println!("room id: {room_id:?}");
900                    println!("room_name: {room_name:?}");
901                    tx_clone.send((room_id, room_name)).unwrap();
902                }
903            });
904        },
905    );
906
907    loop {
908        tokio::select! {
909            // Handle incoming events from listener
910            Some((room_id, room_name)) = rx.recv() => {
911                let mut lock = rooms_list.lock().await;
912                lock.handle_current_active_room(&app_handle, room_id, room_name)
913                    .expect("Couldn't set the room screen");
914            }
915
916            // Listen to UI refresh events
917            _ = ui_subscriber.recv() => {
918                let mut lock = rooms_list.lock().await;
919                lock.handle_rooms_list_updates(&app_handle).await;
920            }
921        }
922    }
923}