matrix-ui-serializable 0.4.0

Opinionated abstraction of the matrix-sdk crate with serializable structs
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
#![recursion_limit = "256"]
use std::{path::PathBuf, sync::Arc};

use futures::StreamExt;
use serde::{Serialize, ser::Serializer};
use tokio::{
    runtime::Handle,
    sync::{broadcast, mpsc::unbounded_channel},
};
use tracing::{error, info};
use url::Url;

use crate::{
    init::{
        FrontendAuthTypeResponse, check_homeserver_auth_type,
        session::{setup_token_background_save, try_restore_session_to_state},
        singletons::{
            APP_DATA_DIR, CURRENT_USER_ID, EVENT_BRIDGE, REQUEST_SENDER,
            VERIFICATION_RESPONSE_RECEIVER,
        },
        workers::{async_main_loop, async_worker},
    },
    models::{
        event_bridge::EventBridge,
        events::{
            EmitEvent, MatrixLoginPayload, MatrixUpdateCurrentActiveRoom,
            MatrixVerificationResponse, ToastNotificationRequest, ToastNotificationVariant,
        },
        state_updater::StateUpdater,
    },
    room::{
        notifications::enqueue_toast_notification,
        rooms_list::{RoomsCollectionStatus, RoomsListUpdate, enqueue_rooms_list_update},
    },
};

pub(crate) mod account;
pub mod commands;
pub(crate) mod events;
pub(crate) mod init;
pub mod models;
pub(crate) mod room;
pub(crate) mod stores;
pub(crate) mod user;
pub(crate) mod utils;

pub type Result<T> = std::result::Result<T, Error>;

/// matrix-ui-serializable Error enum
#[derive(Debug, thiserror::Error)]
pub enum Error {
    #[error(transparent)]
    Io(#[from] std::io::Error),
    #[error(transparent)]
    Anyhow(#[from] anyhow::Error),
    #[error(transparent)]
    MatrixSdk(#[from] matrix_sdk::Error),
}

impl Serialize for Error {
    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        serializer.serialize_str(self.to_string().as_ref())
    }
}

/// Required `mpsc:Receiver`s to listen to incoming events
pub struct EventReceivers {
    // Event based
    verification_response_receiver: mpsc::Receiver<MatrixVerificationResponse>,
    room_update_receiver: mpsc::Receiver<MatrixUpdateCurrentActiveRoom>,
    // Command based
    matrix_login_receiver: mpsc::Receiver<MatrixLoginPayload>,
    oauth_deeplink_receiver: mpsc::Receiver<Url>,
}

impl EventReceivers {
    pub fn new(
        verification_response_receiver: mpsc::Receiver<MatrixVerificationResponse>,
        room_update_receiver: mpsc::Receiver<MatrixUpdateCurrentActiveRoom>,
        matrix_login_receiver: mpsc::Receiver<MatrixLoginPayload>,
        oauth_deeplink_receiver: mpsc::Receiver<Url>,
    ) -> Self {
        Self {
            verification_response_receiver,
            room_update_receiver,
            matrix_login_receiver,
            oauth_deeplink_receiver,
        }
    }
}

/// The required configuration for this lib. Adapters must implement updaters and event_receivers.
pub struct LibConfig {
    /// The functions that will be in charge of updating the frontend states / stores
    /// from the backend state
    updaters: Box<dyn StateUpdater>,
    /// To listen to events coming from the frontend, we use channels.
    event_receivers: EventReceivers,
    /// The session option stored (or not) by the adapter. It is serialized.
    session_option: Option<String>,
    /// A PathBuf to the application data directory
    app_data_dir: PathBuf,
    /// The client's URL (for oauth metadata)
    oauth_client_uri: Url,
    /// A callback URL that will handle the redirection to your app when logging with the Oauth flow
    oauth_redirect_uri: Url,
}

impl LibConfig {
    pub fn new(
        updaters: Box<dyn StateUpdater>,
        event_receivers: EventReceivers,
        session_option: Option<String>,
        app_data_dir: PathBuf,
        oauth_client_uri: Url,
        oauth_redirect_uri: Url,
    ) -> Self {
        Self {
            updaters,
            event_receivers,
            session_option,
            app_data_dir,
            oauth_client_uri,
            oauth_redirect_uri,
        }
    }
}

/// Function to be called once your app is starting to init this lib.
/// This will start the workers and return a `Receiver` to forward outgoing events.
pub fn init(mut config: LibConfig) -> broadcast::Receiver<EmitEvent> {
    APP_DATA_DIR
        .set(config.app_data_dir)
        .expect("Couldn't set app data dir");

    // Lib -> adapter events
    let (event_bridge, broadcast_receiver) = EventBridge::new();
    EVENT_BRIDGE
        .set(event_bridge)
        .expect("Couldn't set the event bridge");

    let basic_init_handle = Handle::current().spawn(async move {
        // Adapter -> lib events

        VERIFICATION_RESPONSE_RECEIVER
            .set(tokio::sync::Mutex::new(
                config.event_receivers.verification_response_receiver,
            ))
            .expect("Couldn't set the verification response receiver");

        // Create a channel to be used between UI thread(s) and the async worker thread.
        init::singletons::init_broadcaster(16).expect("Couldn't init the UI broadcaster");

        let (matrix_request_sender, matrix_request_receiver) = unbounded_channel::<MatrixRequest>();
        REQUEST_SENDER
            .set(matrix_request_sender)
            .expect("BUG: REQUEST_SENDER already set!");

        matrix_request_receiver
    });

    let _monitor = Handle::current().spawn(async move {
        let updaters_arc = Arc::new(config.updaters);
        let inner_updaters = updaters_arc.clone();

        // Setup the token refresher thread before first sync or client build.
        setup_token_background_save(inner_updaters.clone());
        let matrix_request_receiver = basic_init_handle.await.expect("couldn't do basic init");

        let client_opt = match try_restore_session_to_state(config.session_option).await {
            Ok(opt) => opt,
            Err(e) => {
                enqueue_toast_notification(ToastNotificationRequest::new(
                    format!("Failed to restore session, falling back on login. Error: {e}"),
                    None,
                    ToastNotificationVariant::Error,
                ));
                None
            }
        };

        let (client, _has_been_restored) = match client_opt {
            Some(restored) => {
                if let Err(e) = &inner_updaters.update_login_state(
                    LoginState::Restored,
                    restored.user_id().map(|v| v.to_string()),
                ) {
                    enqueue_toast_notification(ToastNotificationRequest::new(
                        format!("Cannot update login state. Error: {e}"),
                        None,
                        ToastNotificationVariant::Error,
                    ))
                }
                (restored, true)
            }
            None => {
                LOGIN_STORE_READY.wait();
                if let Err(e) =
                    &inner_updaters.update_login_state(LoginState::AwaitingForHomeserver, None)
                {
                    enqueue_toast_notification(ToastNotificationRequest::new(
                        format!("Cannot update login state. Error: {e}"),
                        None,
                        ToastNotificationVariant::Error,
                    ))
                }
                info!("Waiting for homeserver selection...");

                let client = if let Ok((auth_type, client)) = check_homeserver_auth_type().await {
                    let serialized_session = match auth_type {
                        FrontendAuthTypeResponse::Oauth => init::oauth::register_and_login_oauth(
                            &client,
                            config.event_receivers.oauth_deeplink_receiver,
                            &config.oauth_client_uri,
                            &config.oauth_redirect_uri,
                        )
                        .await
                        .expect("Failed to login with OAuth"),
                        FrontendAuthTypeResponse::Matrix => {
                            // wait for frontend payload
                            let login_payload = config
                                .event_receivers
                                .matrix_login_receiver
                                .recv()
                                .await
                                .expect("no login sender to listen to");
                            init::login::login_and_persist_matrix_session(
                                &client,
                                login_payload.username.clone(),
                                login_payload.password.clone(),
                                login_payload.client_name.clone(),
                            )
                            .await
                            .expect("Failed to login with Matrix Auth")
                        }
                        FrontendAuthTypeResponse::WrongUrl => {
                            enqueue_toast_notification(ToastNotificationRequest::new(
                                "The homeserver URL is incorrect".to_owned(),
                                Some("Please restart the app and try another one.".to_owned()),
                                ToastNotificationVariant::Error,
                            ));
                            "".to_owned()
                        }
                    };

                    if let Err(e) = &inner_updaters
                        .persist_login_session(serialized_session)
                        .await
                    {
                        enqueue_toast_notification(ToastNotificationRequest::new(
                            format!("Failed to persist login session. Error: {e}"),
                            None,
                            ToastNotificationVariant::Error,
                        ));
                    }

                    client
                } else {
                    panic!("Unknown homeserver auth type")
                };

                (client, false)
            }
        };

        CURRENT_USER_ID
            .set(client.user_id().unwrap().to_owned())
            .expect("Couldn't set CURRENT_USER_ID singleton");

        let user_avatar = client.account().get_avatar_url().await.map_or(None, |a| a);

        let user_display_name = client
            .account()
            .get_display_name()
            .await
            .map_or(None, |n| n);

        let device_name = client
            .encryption()
            .get_own_device()
            .await
            .ok()
            .flatten()
            .and_then(|d| d.display_name().map(|s| s.to_owned()));

        if let Err(e) = inner_updaters.update_current_user_info(
            Some(CURRENT_USER_ID.get().unwrap().to_owned()),
            user_avatar,
            user_display_name,
            device_name,
        ) {
            enqueue_toast_notification(ToastNotificationRequest::new(
                format!("Cannot update current user info. Error: {e}"),
                None,
                ToastNotificationVariant::Error,
            ))
        }

        // Update frontend login state
        if let Err(e) = &inner_updaters.update_login_state(
            LoginState::LoggedIn,
            CURRENT_USER_ID.get().map(|u| u.to_string()),
        ) {
            error!("Cannot update frontend login store. {e}");
            enqueue_toast_notification(ToastNotificationRequest::new(
                format!("Cannot update login state. Error: {e}"),
                None,
                ToastNotificationVariant::Error,
            ))
        }

        let mut verification_subscriber = client.encryption().verification_state();

        let verification_state_updaters = inner_updaters.clone();
        tokio::task::spawn(async move {
            while let Some(state) = verification_subscriber.next().await {
                if let Err(e) = verification_state_updaters
                    .update_verification_state(FrontendVerificationState::new(state))
                {
                    enqueue_toast_notification(ToastNotificationRequest::new(
                        format!("Cannot update verification store. Error: {e}"),
                        None,
                        ToastNotificationVariant::Error,
                    ))
                }
            }
        });

        let mut state_stream = client.encryption().recovery().state_stream();
        let recovery_state_updaters = inner_updaters.clone();
        tokio::task::spawn(async move {
            while let Some(update) = state_stream.next().await {
                recovery_state_updaters
                    .update_recovery_state(update)
                    .expect("couldn't update frontend recovery state")
            }
        });

        let mut ui_event_receiver =
            init::singletons::subscribe_to_events().expect("Couldn't get UI event receiver"); // subscribe to events so the sender(s) never fail

        // Spawn the actual async worker thread.
        let mut worker_join_handle = Handle::current().spawn(async_worker(matrix_request_receiver));

        // // Start the main loop that drives the Matrix client SDK.
        let mut main_loop_join_handle = Handle::current().spawn(async_main_loop(
            client,
            updaters_arc,
            config.event_receivers.room_update_receiver,
        ));

        LOGIN_STORE_READY.wait();

        #[allow(clippy::never_loop)] // unsure if needed, just following tokio's examples.
        loop {
            tokio::select! {
                result = &mut main_loop_join_handle => {
                    match result {
                        Ok(Ok(())) => {
                            error!("BUG: main async loop task ended unexpectedly!");
                        }
                        Ok(Err(e)) => {
                            error!("Error: main async loop task ended:\n\t{e:?}");
                            enqueue_rooms_list_update(RoomsListUpdate::Status {
                                status: RoomsCollectionStatus::Error(e.to_string()),
                            });
                            enqueue_toast_notification(ToastNotificationRequest::new(
                                format!("Rooms list update error: {e}"),
                                None,
                                ToastNotificationVariant::Error,
                            ));
                        },
                        Err(e) => {
                            error!("BUG: failed to join main async loop task: {e:?}");
                        }
                    }
                    break;
                }
                result = &mut worker_join_handle => {
                    match result {
                        Ok(Ok(())) => {
                            error!("BUG: async worker task ended unexpectedly!");
                        }
                        Ok(Err(e)) => {
                            error!("Error: async worker task ended:\n\t{e:?}");
                            enqueue_rooms_list_update(RoomsListUpdate::Status {
                                status: RoomsCollectionStatus::Error(e.to_string()),
                            });
                            enqueue_toast_notification(ToastNotificationRequest::new(
                                format!("Rooms list update error: {e}"),
                                None,
                                ToastNotificationVariant::Error,
                            ));
                        },
                        Err(e) => {
                            error!("BUG: failed to join async worker task: {e:?}");
                        }
                    }
                    break;
                }
                _ = ui_event_receiver.recv() => {
                    #[cfg(debug_assertions)]
                    tracing::trace!("Received UI update event");
                }
            }
        }
    });

    // Return broadcast receiver for the adapter to forward outgoing events
    broadcast_receiver
}

// Re-exports

pub use init::session::FullMatrixSession;
pub use init::singletons::{CLIENT, LOGIN_STORE_READY};
pub use models::async_requests::*;
pub use room::frontend_events::events_dto::FrontendTimelineItem;
pub use room::room_screen::RoomScreen;
pub use room::rooms_list::RoomsList;
pub use stores::login_store::{FrontendSyncServiceState, FrontendVerificationState, LoginState};
pub use user::user_profile::UserProfile;
// The adapter needs some types in those modules
pub use matrix_sdk::AuthSession;
pub use matrix_sdk::attachment::{
    AttachmentInfo, BaseAudioInfo, BaseFileInfo, BaseImageInfo, BaseVideoInfo, Thumbnail,
};
pub use matrix_sdk::encryption::recovery::RecoveryState;
pub use matrix_sdk::media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings};
pub use matrix_sdk::ruma::serde::base64::{Base64, Standard, UrlSafe};
pub use matrix_sdk::ruma::{
    OwnedDeviceId, OwnedMxcUri, OwnedRoomId, OwnedUserId, UInt,
    events::room::{
        EncryptedFile, EncryptedFileHashes, EncryptedFileInfo, MediaSource, V2EncryptedFileInfo,
    },
    media::Method,
};
pub use tokio::sync::mpsc;
pub use tokio::sync::oneshot;