matrix_ui_serializable/
lib.rs

1use std::{path::PathBuf, sync::Arc, thread, time::Duration};
2
3use serde::{Serialize, ser::Serializer};
4use tokio::{runtime::Handle, sync::broadcast};
5
6use crate::{
7    init::singletons::{
8        CLIENT, EVENT_BRIDGE, REQUEST_SENDER, ROOM_CREATED_RECEIVER, TEMP_DIR,
9        VERIFICATION_RESPONSE_RECEIVER,
10    },
11    init::{
12        session::try_restore_session_to_state,
13        workers::{async_main_loop, async_worker},
14    },
15    models::{
16        event_bridge::EventBridge,
17        events::{
18            EmitEvent, MatrixRoomStoreCreatedRequest, MatrixUpdateCurrentActiveRoom,
19            MatrixVerificationResponse, ToastNotificationRequest, ToastNotificationVariant,
20        },
21        state_updater::StateUpdater,
22    },
23    room::{
24        notifications::enqueue_toast_notification,
25        rooms_list::{RoomsCollectionStatus, RoomsListUpdate, enqueue_rooms_list_update},
26    },
27};
28
29pub mod commands;
30pub(crate) mod events;
31pub(crate) mod init;
32pub mod models;
33pub(crate) mod room;
34pub(crate) mod stores;
35pub(crate) mod user;
36pub(crate) mod utils;
37
38pub type Result<T> = std::result::Result<T, Error>;
39
40/// matrix-ui-serializable Error enum
41#[derive(Debug, thiserror::Error)]
42pub enum Error {
43    #[error(transparent)]
44    Io(#[from] std::io::Error),
45    #[error(transparent)]
46    Anyhow(#[from] anyhow::Error),
47    #[error(transparent)]
48    MatrixSdk(#[from] matrix_sdk::Error),
49}
50
51impl Serialize for Error {
52    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
53    where
54        S: Serializer,
55    {
56        serializer.serialize_str(self.to_string().as_ref())
57    }
58}
59
60/// Required `mpsc:Receiver`s to listen to incoming events
61pub struct EventReceivers {
62    room_created_receiver: mpsc::Receiver<MatrixRoomStoreCreatedRequest>,
63    verification_response_receiver: mpsc::Receiver<MatrixVerificationResponse>,
64    room_update_receiver: mpsc::Receiver<MatrixUpdateCurrentActiveRoom>,
65}
66
67impl EventReceivers {
68    pub fn new(
69        room_created_receiver: mpsc::Receiver<MatrixRoomStoreCreatedRequest>,
70        verification_response_receiver: mpsc::Receiver<MatrixVerificationResponse>,
71        room_update_receiver: mpsc::Receiver<MatrixUpdateCurrentActiveRoom>,
72    ) -> Self {
73        Self {
74            room_created_receiver,
75            verification_response_receiver,
76            room_update_receiver,
77        }
78    }
79}
80
81/// The required configuration for this lib. Adapters must implement updaters and event_receivers.
82pub struct LibConfig {
83    /// The functions that will be in charge of updating the frontend states / stores
84    /// from the backend state
85    updaters: Box<dyn StateUpdater>,
86    /// To listen to events coming from the frontend, we use channels.
87    event_receivers: EventReceivers,
88    /// The session option stored (or not) by the adapter. It is serialized.
89    session_option: Option<String>,
90    /// The required configuration for mobile push notifications to work
91    mobile_push_notifications_config: Option<MobilePushNotificationConfig>,
92    /// A PathBuf to the temporary dir of the app
93    temp_dir: PathBuf,
94}
95
96impl LibConfig {
97    pub fn new(
98        updaters: Box<dyn StateUpdater>,
99        mobile_push_notifications_config: Option<MobilePushNotificationConfig>,
100        event_receivers: EventReceivers,
101        session_option: Option<String>,
102        temp_dir: PathBuf,
103    ) -> Self {
104        Self {
105            updaters,
106            mobile_push_notifications_config,
107            event_receivers,
108            session_option,
109            temp_dir,
110        }
111    }
112}
113
114/// Function to be called once your app is starting to init this lib.
115/// This will start the workers and return a `Receiver` to forward outgoing events.
116pub fn init(config: LibConfig) -> broadcast::Receiver<EmitEvent> {
117    // Lib -> adapter events
118    let (event_bridge, broadcast_receiver) = EventBridge::new();
119    EVENT_BRIDGE
120        .set(event_bridge)
121        .expect("Couldn't set the event bridge");
122
123    // Adapter -> lib events
124    ROOM_CREATED_RECEIVER
125        .set(tokio::sync::Mutex::new(
126            config.event_receivers.room_created_receiver,
127        ))
128        .expect("Couldn't set room created receiver");
129
130    VERIFICATION_RESPONSE_RECEIVER
131        .set(tokio::sync::Mutex::new(
132            config.event_receivers.verification_response_receiver,
133        ))
134        .expect("Couldn't set the verification response receiver");
135
136    // Create a channel to be used between UI thread(s) and the async worker thread.
137    crate::init::singletons::init_broadcaster(16).expect("Couldn't init the UI broadcaster"); // TODO: adapt capacity if needed
138
139    let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<MatrixRequest>();
140    REQUEST_SENDER
141        .set(sender)
142        .expect("BUG: REQUEST_SENDER already set!");
143
144    TEMP_DIR
145        .set(config.temp_dir)
146        .expect("Couldn't set temporary dir");
147
148    let _monitor = Handle::current().spawn(async move {
149        let client = try_restore_session_to_state(
150            config.session_option,
151            config.mobile_push_notifications_config,
152        )
153        .await
154        .expect("Couldn't try to restore session");
155
156        LOGIN_STORE_READY.wait();
157        let client = match client {
158            Some(new_login) => {
159                let _ = &config
160                    .updaters
161                    .update_login_state(
162                        LoginState::Restored,
163                        new_login
164                            .user_id()
165                            .map_or(None, |val| Some(val.to_string())),
166                    )
167                    .expect("Couldn't update login state");
168                new_login
169            }
170            None => {
171                println!("Waiting for login request...");
172                thread::sleep(Duration::from_secs(3)); // Block the thread for 3 secs to let the frontend init itself.
173                let _ = &config
174                    .updaters
175                    .update_login_state(LoginState::AwaitingForLogin, None)
176                    .expect("Couldn't update login state");
177                // We await frontend to call the login command and set the client
178                // loop until client is set
179                CLIENT.wait();
180                let client = CLIENT.get().unwrap().clone();
181                let _ = &config
182                    .updaters
183                    .update_login_state(
184                        LoginState::LoggedIn,
185                        client
186                            .user_id()
187                            .clone()
188                            .map_or(None, |val| Some(val.to_string())),
189                    )
190                    .expect("Couldn't update login state");
191                client
192            }
193        };
194
195        let mut verification_subscriber = client.encryption().verification_state();
196
197        let updaters_arc = Arc::new(config.updaters);
198        let inner_updaters = updaters_arc.clone();
199        tokio::task::spawn(async move {
200            while let Some(state) = verification_subscriber.next().await {
201                inner_updaters
202                    .clone()
203                    .update_verification_state(FrontendVerificationState::new(state))
204                    .expect("Couldn't update verification state in Svelte Store");
205            }
206        });
207
208        let mut ui_event_receiver =
209            crate::init::singletons::subscribe_to_events().expect("Couldn't get UI event receiver"); // subscribe to events so the sender(s) never fail
210
211        // Spawn the actual async worker thread.
212        let mut worker_join_handle = Handle::current().spawn(async_worker(receiver));
213
214        // // Start the main loop that drives the Matrix client SDK.
215        let mut main_loop_join_handle = Handle::current().spawn(async_main_loop(
216            client,
217            updaters_arc,
218            config.event_receivers.room_update_receiver,
219        ));
220
221        #[allow(clippy::never_loop)] // unsure if needed, just following tokio's examples.
222        loop {
223            tokio::select! {
224                result = &mut main_loop_join_handle => {
225                    match result {
226                        Ok(Ok(())) => {
227                            eprintln!("BUG: main async loop task ended unexpectedly!");
228                        }
229                        Ok(Err(e)) => {
230                            eprintln!("Error: main async loop task ended:\n\t{e:?}");
231                            enqueue_rooms_list_update(RoomsListUpdate::Status {
232                                status: RoomsCollectionStatus::Error(e.to_string()),
233                            });
234                            enqueue_toast_notification(ToastNotificationRequest::new(
235                                format!("Rooms list update error: {e}"),
236                                None,
237                                ToastNotificationVariant::Error,
238                            ));
239                        },
240                        Err(e) => {
241                            eprintln!("BUG: failed to join main async loop task: {e:?}");
242                        }
243                    }
244                    break;
245                }
246                result = &mut worker_join_handle => {
247                    match result {
248                        Ok(Ok(())) => {
249                            eprintln!("BUG: async worker task ended unexpectedly!");
250                        }
251                        Ok(Err(e)) => {
252                            eprintln!("Error: async worker task ended:\n\t{e:?}");
253                            enqueue_rooms_list_update(RoomsListUpdate::Status {
254                                status: RoomsCollectionStatus::Error(e.to_string()),
255                            });
256                            enqueue_toast_notification(ToastNotificationRequest::new(
257                                format!("Rooms list update error: {e}"),
258                                None,
259                                ToastNotificationVariant::Error,
260                            ));
261                        },
262                        Err(e) => {
263                            eprintln!("BUG: failed to join async worker task: {e:?}");
264                        }
265                    }
266                    break;
267                }
268                _ = ui_event_receiver.recv() => {
269                    #[cfg(debug_assertions)]
270                    println!("Received UI update event");
271                }
272            }
273        }
274    });
275
276    // Return broadcast receiver for the adapter to forward outgoing events
277    broadcast_receiver
278}
279
280// Re-exports
281
282pub use init::login::MatrixClientConfig;
283pub use init::singletons::LOGIN_STORE_READY;
284pub use models::async_requests::*;
285pub use room::notifications::MobilePushNotificationConfig;
286pub use room::room_screen::RoomScreen;
287pub use room::rooms_list::RoomsList;
288pub use stores::login_store::{FrontendSyncServiceState, FrontendVerificationState, LoginState};
289pub use user::user_profile::UserProfileMap;
290
291// The adapter needs some types in those modules
292pub use matrix_sdk::media::MediaRequestParameters;
293pub use matrix_sdk::ruma::{OwnedDeviceId, OwnedRoomId, OwnedUserId};
294pub use tokio::sync::mpsc;
295pub use tokio::sync::oneshot;