use std::{future::ready, sync::Mutex};
use eyeball::{SharedObservable, Subscriber};
use matrix_sdk::{SlidingSync, SlidingSyncMode, sliding_sync::Range};
use ruma::time::{Duration, Instant};
use super::Error;
pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";
#[derive(Clone, Debug, PartialEq)]
pub enum State {
Init,
SettingUp,
Recovering,
Running,
Error { from: Box<State> },
Terminated { from: Box<State> },
}
const DEFAULT_STATE_LIFESPAN: Duration = Duration::from_secs(1800);
#[derive(Debug)]
pub struct StateMachine {
state: SharedObservable<State>,
last_state_update_time: Mutex<Instant>,
state_lifespan: Duration,
}
impl StateMachine {
pub(super) fn new() -> Self {
StateMachine {
state: SharedObservable::new(State::Init),
last_state_update_time: Mutex::new(Instant::now()),
state_lifespan: DEFAULT_STATE_LIFESPAN,
}
}
pub(super) fn get(&self) -> State {
self.state.get()
}
pub(super) fn cloned_state(&self) -> SharedObservable<State> {
self.state.clone()
}
pub(super) fn set(&self, state: State) {
let mut last_state_update_time = self.last_state_update_time.lock().unwrap();
*last_state_update_time = Instant::now();
self.state.set(state);
}
pub fn subscribe(&self) -> Subscriber<State> {
self.state.subscribe()
}
pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<State, Error> {
use State::*;
let next_state = match self.get() {
Init => SettingUp,
SettingUp | Recovering => {
set_all_rooms_to_growing_sync_mode(sliding_sync).await?;
Running
}
Running => {
if self.last_state_update_time.lock().unwrap().elapsed() > self.state_lifespan {
set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
Recovering
} else {
Running
}
}
Error { from: previous_state } | Terminated { from: previous_state } => {
match previous_state.as_ref() {
Error { .. } | Terminated { .. } => {
unreachable!(
"It's impossible to reach `Error` or `Terminated` from `Error` or `Terminated`"
);
}
Running => {
set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
Recovering
}
state => state.to_owned(),
}
}
};
Ok(next_state)
}
}
async fn set_all_rooms_to_growing_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| {
list.set_sync_mode(SlidingSyncMode::new_growing(ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE));
ready(())
})
.await
.ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
}
async fn set_all_rooms_to_selective_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| {
list.set_sync_mode(
SlidingSyncMode::new_selective().add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
);
ready(())
})
.await
.ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
}
pub const ALL_ROOMS_DEFAULT_SELECTIVE_RANGE: Range = 0..=19;
pub const ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE: u32 = 100;
#[cfg(test)]
mod tests {
use matrix_sdk::test_utils::client::MockClientBuilder;
use matrix_sdk_test::async_test;
use tokio::time::sleep;
use super::*;
use crate::RoomListService;
async fn new_room_list() -> Result<RoomListService, Error> {
let client = MockClientBuilder::new(None).build().await;
RoomListService::new(client).await
}
#[async_test]
async fn test_states() -> Result<(), Error> {
let room_list = new_room_list().await?;
let sliding_sync = room_list.sliding_sync();
let state_machine = StateMachine::new();
{
state_machine.set(State::Error { from: Box::new(state_machine.get()) });
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Init);
}
{
state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Init);
}
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::SettingUp);
{
state_machine.set(State::Error { from: Box::new(state_machine.get()) });
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::SettingUp);
}
{
state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::SettingUp);
}
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
{
state_machine.set(State::Error { from: Box::new(state_machine.get()) });
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Recovering);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
}
{
state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Recovering);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
}
{
state_machine.set(State::Error { from: Box::new(State::Recovering) });
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Recovering);
}
{
state_machine.set(State::Terminated { from: Box::new(State::Recovering) });
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Recovering);
}
Ok(())
}
#[async_test]
async fn test_recover_state_after_delay() -> Result<(), Error> {
let room_list = new_room_list().await?;
let sliding_sync = room_list.sliding_sync();
let mut state_machine = StateMachine::new();
state_machine.state_lifespan = Duration::from_millis(50);
{
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::SettingUp);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
}
sleep(Duration::from_millis(100)).await;
{
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Recovering);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
}
sleep(Duration::from_millis(100)).await;
{
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Recovering);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
}
Ok(())
}
#[async_test]
async fn test_action_set_all_rooms_list_to_growing_and_selective_sync_mode() -> Result<(), Error>
{
let room_list = new_room_list().await?;
let sliding_sync = room_list.sliding_sync();
assert_eq!(
sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
list.sync_mode(),
SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
)))
.await,
Some(true)
);
set_all_rooms_to_growing_sync_mode(sliding_sync).await.unwrap();
assert_eq!(
sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
list.sync_mode(),
SlidingSyncMode::Growing {
batch_size, ..
} if batch_size == ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE
)))
.await,
Some(true)
);
set_all_rooms_to_selective_sync_mode(sliding_sync).await.unwrap();
assert_eq!(
sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
list.sync_mode(),
SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
)))
.await,
Some(true)
);
Ok(())
}
}