use std::{fmt::Debug, sync::Arc};
use anyhow::{bail, Context};
use futures_core::stream::Stream;
use matrix_sdk_base::deserialized_responses::SyncResponse;
use ruma::{
api::client::sync::sync_events::v4,
assign,
events::{AnySyncTimelineEvent, RoomEventType},
serde::Raw,
OwnedRoomId, RoomId, UInt,
};
use url::Url;
use crate::{Client, Result};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SlidingSyncState {
Cold,
Preload,
CatchingUp,
Live,
}
impl Default for SlidingSyncState {
fn default() -> Self {
SlidingSyncState::Cold
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SlidingSyncMode {
FullSync,
Selective,
}
impl Default for SlidingSyncMode {
fn default() -> Self {
SlidingSyncMode::FullSync
}
}
#[derive(Clone, Debug)]
pub enum RoomListEntry {
Empty,
Invalidated(OwnedRoomId),
Filled(OwnedRoomId),
}
impl RoomListEntry {
pub fn empty_or_invalidated(&self) -> bool {
matches!(self, RoomListEntry::Empty | RoomListEntry::Invalidated(_))
}
pub fn as_room_id(&self) -> Option<&RoomId> {
match &self {
RoomListEntry::Empty => None,
RoomListEntry::Invalidated(b) | RoomListEntry::Filled(b) => Some(b.as_ref()),
}
}
}
pub type AliveRoomTimeline =
Arc<futures_signals::signal_vec::MutableVec<Raw<AnySyncTimelineEvent>>>;
impl Default for RoomListEntry {
fn default() -> Self {
RoomListEntry::Empty
}
}
#[derive(Debug, Clone)]
pub struct SlidingSyncRoom {
room_id: OwnedRoomId,
inner: v4::SlidingSyncRoom,
is_loading_more: futures_signals::signal::Mutable<bool>,
prev_batch: futures_signals::signal::Mutable<Option<String>>,
timeline: AliveRoomTimeline,
}
impl SlidingSyncRoom {
fn from(room_id: OwnedRoomId, mut inner: v4::SlidingSyncRoom) -> Self {
let v4::SlidingSyncRoom { timeline, .. } = inner;
inner.timeline = vec![];
Self {
room_id,
is_loading_more: futures_signals::signal::Mutable::new(false),
prev_batch: futures_signals::signal::Mutable::new(inner.prev_batch.clone()),
timeline: Arc::new(futures_signals::signal_vec::MutableVec::new_with_values(timeline)),
inner,
}
}
pub fn room_id(&self) -> &OwnedRoomId {
&self.room_id
}
pub fn is_loading_more(&self) -> bool {
*self.is_loading_more.lock_ref()
}
pub fn prev_batch(&self) -> Option<String> {
self.prev_batch.lock_ref().clone()
}
pub fn timeline(&self) -> AliveRoomTimeline {
self.timeline.clone()
}
pub fn name(&self) -> Option<&str> {
self.inner.name.as_deref()
}
fn update(&mut self, room_data: &v4::SlidingSyncRoom) {
let v4::SlidingSyncRoom {
name,
initial,
is_dm,
invite_state,
unread_notifications,
required_state,
prev_batch,
timeline,
..
} = room_data;
self.inner.unread_notifications = unread_notifications.clone();
if name.is_some() {
self.inner.name = name.clone();
}
if initial.is_some() {
self.inner.initial = *initial;
}
if is_dm.is_some() {
self.inner.is_dm = *is_dm;
}
if !invite_state.is_empty() {
self.inner.invite_state = invite_state.clone();
}
if !required_state.is_empty() {
self.inner.required_state = required_state.clone();
}
if let Some(batch) = prev_batch {
self.prev_batch.lock_mut().replace(batch.clone());
}
if !timeline.is_empty() {
let mut ref_timeline = self.timeline.lock_mut();
for e in timeline {
ref_timeline.push_cloned(e.clone());
}
}
}
}
impl std::ops::Deref for SlidingSyncRoom {
type Target = v4::SlidingSyncRoom;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
type ViewState = futures_signals::signal::Mutable<SlidingSyncState>;
type SyncMode = futures_signals::signal::Mutable<SlidingSyncMode>;
type PosState = futures_signals::signal::Mutable<Option<String>>;
type RangeState = futures_signals::signal::Mutable<Vec<(UInt, UInt)>>;
type RoomsCount = futures_signals::signal::Mutable<Option<u32>>;
type RoomsList = Arc<futures_signals::signal_vec::MutableVec<RoomListEntry>>;
type RoomsMap = Arc<futures_signals::signal_map::MutableBTreeMap<OwnedRoomId, SlidingSyncRoom>>;
type RoomsSubscriptions =
Arc<futures_signals::signal_map::MutableBTreeMap<OwnedRoomId, v4::RoomSubscription>>;
type RoomUnsubscribe = Arc<futures_signals::signal_vec::MutableVec<OwnedRoomId>>;
type ViewsList = Arc<futures_signals::signal_vec::MutableVec<SlidingSyncView>>;
use derive_builder::Builder;
#[derive(Debug, Clone)]
pub struct UpdateSummary {
pub views: Vec<String>,
pub rooms: Vec<OwnedRoomId>,
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", derive(Clone, Debug))]
pub struct SlidingSync {
#[builder(setter(strip_option))]
homeserver: Option<Url>,
#[builder(private)]
client: Client,
#[builder(private, default)]
pos: PosState,
#[builder(private, default)]
pub views: ViewsList,
#[builder(private, default)]
subscriptions: RoomsSubscriptions,
#[builder(private, default)]
unsubscribe: RoomUnsubscribe,
#[builder(private, default)]
rooms: RoomsMap,
}
impl SlidingSyncBuilder {
pub fn add_fullsync_view(mut self) -> Self {
let views = self.views.clone().unwrap_or_default();
views.lock_mut().push_cloned(
SlidingSyncViewBuilder::default_with_fullsync()
.build()
.expect("Building default full sync view doesn't fail"),
);
self.views = Some(views);
self
}
pub fn no_views(mut self) -> Self {
self.views = None;
self
}
pub fn add_view(mut self, mut v: SlidingSyncView) -> Self {
let rooms = self.rooms.clone().unwrap_or_default();
self.rooms = Some(rooms.clone());
v.rooms = rooms;
let views = self.views.clone().unwrap_or_default();
views.lock_mut().push_cloned(v);
self.views = Some(views);
self
}
}
impl SlidingSync {
pub fn new_builder_copy(&self) -> SlidingSyncBuilder {
let builder = SlidingSyncBuilder::default()
.client(self.client.clone())
.views(Arc::new(futures_signals::signal_vec::MutableVec::new_with_values(
self.views
.lock_ref()
.to_vec()
.iter()
.map(|v| {
v.new_builder().build().expect("builder worked before, builder works now")
})
.collect(),
)))
.subscriptions(Arc::new(futures_signals::signal_map::MutableBTreeMap::with_values(
self.subscriptions.lock_ref().to_owned(),
)));
if let Some(h) = &self.homeserver {
builder.homeserver(h.clone())
} else {
builder
}
}
pub fn subscribe(&self, room_id: OwnedRoomId, settings: Option<v4::RoomSubscription>) {
self.subscriptions.lock_mut().insert_cloned(room_id, settings.unwrap_or_default());
}
pub fn unsubscribe(&self, room_id: OwnedRoomId) {
if self.subscriptions.lock_mut().remove(&room_id).is_some() {
self.unsubscribe.lock_mut().push_cloned(room_id);
}
}
pub fn get_room(&self, room_id: OwnedRoomId) -> Option<SlidingSyncRoom> {
self.rooms.lock_ref().get(&room_id).cloned()
}
pub fn get_rooms<I: Iterator<Item = OwnedRoomId>>(
&self,
room_ids: I,
) -> Vec<Option<SlidingSyncRoom>> {
let rooms = self.rooms.lock_ref();
room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
}
async fn handle_response(
&self,
resp: v4::Response,
views: &[SlidingSyncView],
) -> anyhow::Result<UpdateSummary> {
self.client.process_sliding_sync(resp.clone()).await?;
tracing::info!("main client processed.");
self.pos.replace(Some(resp.pos));
let mut updated_views = Vec::new();
if resp.lists.len() != views.len() {
bail!("Received response for {} lists, yet we have {}", resp.lists.len(), views.len());
}
for (view, updates) in std::iter::zip(views, &resp.lists) {
let count: u32 =
updates.count.try_into().expect("the list total count convertible into u32");
tracing::trace!("view {:?} update: {:?}", view.name, !updates.ops.is_empty());
if !updates.ops.is_empty() && view.handle_response(count, &updates.ops)? {
updated_views.push(view.name.clone());
}
}
let mut rooms = Vec::new();
let mut rooms_map = self.rooms.lock_mut();
for (id, room_data) in resp.rooms.iter() {
if let Some(mut r) = rooms_map.remove(id) {
r.update(room_data);
rooms_map.insert_cloned(id.clone(), r);
rooms.push(id.clone());
} else {
rooms_map.insert_cloned(
id.clone(),
SlidingSyncRoom::from(id.clone(), room_data.clone()),
);
rooms.push(id.clone());
}
}
Ok(UpdateSummary { views: updated_views, rooms })
}
pub async fn stream<'a>(
&self,
) -> anyhow::Result<impl Stream<Item = anyhow::Result<UpdateSummary>> + '_> {
let views = self.views.lock_ref().to_vec();
let _pos = self.pos.clone();
let client = self.client.clone();
Ok(async_stream::try_stream! {
let mut remaining_views = views.clone();
let mut remaining_generators: Vec<SlidingSyncViewRequestGenerator<'_>> = views
.iter()
.map(SlidingSyncView::request_generator)
.collect();
loop {
let mut requests = Vec::new();
let mut new_remaining_generators = Vec::new();
let mut new_remaining_views = Vec::new();
for (mut generator, view) in std::iter::zip(remaining_generators, remaining_views) {
if let Some(request) = generator.next() {
requests.push(request);
new_remaining_generators.push(generator);
new_remaining_views.push(view);
}
}
if new_remaining_views.is_empty() {
return
}
remaining_views = new_remaining_views;
remaining_generators = new_remaining_generators;
let pos = self.pos.get_cloned();
let room_subscriptions = self.subscriptions.lock_ref().clone();
let unsubscribe_rooms = {
let unsubs = self.unsubscribe.lock_ref().to_vec();
if !unsubs.is_empty() {
self.unsubscribe.lock_mut().clear();
}
unsubs
};
let req = assign!(v4::Request::new(), {
lists: &requests,
pos: pos.as_deref(),
room_subscriptions,
unsubscribe_rooms: &unsubscribe_rooms,
});
tracing::debug!("requesting");
let resp = client
.send_with_homeserver(req, None, self.homeserver.as_ref().map(ToString::to_string))
.await?;
tracing::debug!("received");
let updates = self.handle_response(resp, &remaining_views).await?;
tracing::debug!("handled");
yield updates;
}
})
}
}
#[derive(Clone, Debug, Builder)]
#[builder(build_fn(name = "finish_build"), pattern = "owned", derive(Clone, Debug))]
pub struct SlidingSyncView {
#[builder(setter(custom), default)]
sync_mode: SyncMode,
#[builder(default = "SlidingSyncViewBuilder::default_sort()")]
sort: Vec<String>,
#[builder(default = "SlidingSyncViewBuilder::default_required_state()")]
required_state: Vec<(RoomEventType, String)>,
#[builder(default = "20")]
batch_size: u32,
#[builder(default)]
filters: Option<v4::SyncRequestListFilters>,
#[builder(setter(name = "timeline_limit_raw"), default)]
timeline_limit: Option<UInt>,
#[builder(setter(into))]
pub name: String,
#[builder(private, default)]
pub state: ViewState,
#[builder(private, default)]
pub rooms_count: RoomsCount,
#[builder(private, default)]
pub rooms_list: RoomsList,
#[builder(private, default)]
pub rooms: RoomsMap,
#[builder(setter(name = "ranges_raw"), default)]
ranges: RangeState,
#[builder(private)]
rooms_updated_signal: futures_signals::signal::Sender<()>,
#[builder(private)]
pub rooms_updated_broadcaster:
futures_signals::signal::Broadcaster<futures_signals::signal::Receiver<()>>,
}
pub const FULL_SYNC_VIEW_NAME: &str = "full-sync";
impl SlidingSyncViewBuilder {
pub fn default_with_fullsync() -> Self {
Self::default().name(FULL_SYNC_VIEW_NAME).sync_mode(SlidingSyncMode::FullSync)
}
pub fn build(mut self) -> Result<SlidingSyncView, SlidingSyncViewBuilderError> {
let (sender, receiver) = futures_signals::signal::channel(());
self.rooms_updated_signal = Some(sender);
self.rooms_updated_broadcaster = Some(futures_signals::signal::Broadcaster::new(receiver));
self.finish_build()
}
fn default_sort() -> Vec<String> {
vec!["by_recency".to_owned(), "by_name".to_owned()]
}
fn default_required_state() -> Vec<(RoomEventType, String)> {
vec![
(RoomEventType::RoomEncryption, "".to_owned()),
(RoomEventType::RoomTombstone, "".to_owned()),
]
}
pub fn sync_mode(mut self, sync_mode: SlidingSyncMode) -> Self {
self.sync_mode = Some(SyncMode::new(sync_mode));
self
}
pub fn ranges<U: Into<UInt>>(mut self, range: Vec<(U, U)>) -> Self {
self.ranges =
Some(RangeState::new(range.into_iter().map(|(a, b)| (a.into(), b.into())).collect()));
self
}
pub fn add_range<U: Into<UInt>>(mut self, from: U, to: U) -> Self {
let r = self.ranges.get_or_insert_with(|| RangeState::new(Vec::new()));
r.lock_mut().push((from.into(), to.into()));
self
}
pub fn reset_ranges(mut self) -> Self {
self.ranges = None;
self
}
pub fn timeline_limit<U: Into<UInt>>(mut self, timeline_limit: U) -> Self {
self.timeline_limit = Some(Some(timeline_limit.into()));
self
}
pub fn no_timeline_limit(mut self) -> Self {
self.timeline_limit = None;
self
}
}
enum InnerSlidingSyncViewRequestGenerator {
FullSync { position: u32, batch_size: u32 },
Live,
}
struct SlidingSyncViewRequestGenerator<'a> {
view: &'a SlidingSyncView,
inner: InnerSlidingSyncViewRequestGenerator,
}
impl<'a> SlidingSyncViewRequestGenerator<'a> {
fn new_with_syncup(view: &'a SlidingSyncView) -> Self {
let batch_size = view.batch_size;
SlidingSyncViewRequestGenerator {
view,
inner: InnerSlidingSyncViewRequestGenerator::FullSync { position: 0, batch_size },
}
}
fn new_live(view: &'a SlidingSyncView) -> Self {
SlidingSyncViewRequestGenerator { view, inner: InnerSlidingSyncViewRequestGenerator::Live }
}
fn prefetch_request(&self, start: u32, batch_size: u32) -> (u32, v4::SyncRequestList) {
let end = start + batch_size;
let ranges = vec![(start.into(), end.into())];
(end, self.make_request_for_ranges(ranges))
}
fn make_request_for_ranges(&self, ranges: Vec<(UInt, UInt)>) -> v4::SyncRequestList {
let sort = self.view.sort.clone();
let required_state = self.view.required_state.clone();
let timeline_limit = self.view.timeline_limit;
let filters = self.view.filters.clone();
assign!(v4::SyncRequestList::default(), {
ranges,
required_state,
sort,
timeline_limit,
filters,
})
}
fn live_request(&self) -> v4::SyncRequestList {
let ranges = self.view.ranges.read_only().get_cloned();
self.make_request_for_ranges(ranges)
}
}
impl<'a> Iterator for SlidingSyncViewRequestGenerator<'a> {
type Item = v4::SyncRequestList;
fn next(&mut self) -> Option<Self::Item> {
if let InnerSlidingSyncViewRequestGenerator::FullSync { position, .. } = self.inner {
if let Some(count) = self.view.rooms_count.get_cloned() {
if count <= position {
self.view.state.set_if(SlidingSyncState::Live, |before, _now| {
*before == SlidingSyncState::CatchingUp
});
self.view.set_range(0, count);
self.inner = InnerSlidingSyncViewRequestGenerator::Live
}
} else {
self.view.state.set_if(SlidingSyncState::Preload, |before, _now| {
*before == SlidingSyncState::Cold
});
}
}
match self.inner {
InnerSlidingSyncViewRequestGenerator::FullSync { position, batch_size } => {
let (end, req) = self.prefetch_request(position, batch_size);
self.inner =
InnerSlidingSyncViewRequestGenerator::FullSync { position: end, batch_size };
self.view.state.set_if(SlidingSyncState::CatchingUp, |before, _now| {
*before == SlidingSyncState::Preload
});
Some(req)
}
InnerSlidingSyncViewRequestGenerator::Live => Some(self.live_request()),
}
}
}
impl SlidingSyncView {
pub fn new_builder(&self) -> SlidingSyncViewBuilder {
SlidingSyncViewBuilder::default()
.name(&self.name)
.sync_mode(self.sync_mode.lock_ref().clone())
.sort(self.sort.clone())
.required_state(self.required_state.clone())
.batch_size(self.batch_size)
.ranges(self.ranges.read_only().get_cloned())
}
pub fn set_ranges(&self, range: Vec<(u32, u32)>) -> &Self {
*self.ranges.lock_mut() = range.into_iter().map(|(a, b)| (a.into(), b.into())).collect();
self
}
pub fn set_range(&self, start: u32, end: u32) -> &Self {
*self.ranges.lock_mut() = vec![(start.into(), end.into())];
self
}
pub fn add_range(&self, start: u32, end: u32) -> &Self {
self.ranges.lock_mut().push((start.into(), end.into()));
self
}
pub fn reset_ranges(&self) -> &Self {
self.ranges.lock_mut().clear();
self
}
pub fn get_rooms(
&self,
offset: Option<usize>,
count: Option<usize>,
) -> Vec<v4::SlidingSyncRoom> {
let start = offset.unwrap_or(0);
let rooms = self.rooms.lock_ref();
let listing = self.rooms_list.lock_ref();
let count = count.unwrap_or(listing.len() - start);
listing
.iter()
.skip(start)
.filter_map(|id| id.as_room_id())
.filter_map(|id| rooms.get(id))
.map(|r| r.inner.clone())
.take(count)
.collect()
}
pub fn get_room_id(&self, index: usize) -> Option<OwnedRoomId> {
self.rooms_list.lock_ref().get(index).and_then(|e| e.as_room_id().map(ToOwned::to_owned))
}
#[tracing::instrument(skip(self, ops))]
fn room_ops(&self, ops: &Vec<v4::SyncOp>) -> anyhow::Result<()> {
let mut rooms_list = self.rooms_list.lock_mut();
let _rooms_map = self.rooms.lock_mut();
for op in ops {
match &op.op {
v4::SlidingOp::Sync => {
let start: u32 = op
.range
.context("`range` must be present for Sync and Update operation")?
.0
.try_into()?;
let room_ids = op.room_ids.clone();
room_ids
.into_iter()
.enumerate()
.map(|(i, r)| {
let idx = start as usize + i;
rooms_list.set_cloned(idx, RoomListEntry::Filled(r));
})
.count();
}
v4::SlidingOp::Delete => {
let pos: u32 = op
.index
.context("`index` must be present for DELETE operation")?
.try_into()?;
rooms_list.set_cloned(pos as usize, RoomListEntry::Empty);
}
v4::SlidingOp::Insert => {
let pos: usize = op
.index
.context("`index` must be present for INSERT operation")?
.try_into()?;
let sliced = rooms_list.as_slice();
let room = RoomListEntry::Filled(
op.room_id
.clone()
.context("`room_id` must be present for INSERT operation")?,
);
let mut dif = 0usize;
loop {
let (prev_p, prev_overflow) = pos.overflowing_sub(dif);
let check_prev = !prev_overflow;
let (next_p, overflown) = pos.overflowing_add(dif);
let check_after = !overflown && next_p < sliced.len();
if !check_prev && !check_after {
bail!("We were asked to insert but could not find any direction to shift to");
}
if check_prev && sliced[prev_p].empty_or_invalidated() {
rooms_list.remove(prev_p);
break;
} else if check_after && sliced[next_p].empty_or_invalidated() {
rooms_list.remove(next_p);
break;
} else {
dif += 1;
}
}
rooms_list.insert_cloned(pos, room);
}
v4::SlidingOp::Invalidate => {
let max_len = rooms_list.len();
let (mut pos, end): (u32, u32) = if let Some(range) = op.range {
(range.0.try_into()?, range.1.try_into()?)
} else {
bail!("`range` must be given on `Invalidate` operation")
};
if pos > end {
bail!("Invalid invalidation, end smaller than start");
}
while pos < end {
if pos as usize >= max_len {
break; }
let idx = pos as usize;
let entry = if let Some(RoomListEntry::Filled(b)) = rooms_list.get(idx) {
Some(b.clone())
} else {
None
};
if let Some(b) = entry {
rooms_list.set_cloned(pos as usize, RoomListEntry::Invalidated(b));
} else {
rooms_list.set_cloned(pos as usize, RoomListEntry::Empty);
}
pos += 1;
}
}
s => {
tracing::warn!("Unknown operation occurred: {:?}", s);
}
}
}
Ok(())
}
#[tracing::instrument(skip(self, ops))]
fn handle_response(&self, rooms_count: u32, ops: &Vec<v4::SyncOp>) -> anyhow::Result<bool> {
let mut missing =
rooms_count.checked_sub(self.rooms_list.lock_ref().len() as u32).unwrap_or_default();
let mut changed = false;
if missing > 0 {
let mut list = self.rooms_list.lock_mut();
list.reserve_exact(missing as usize);
while missing > 0 {
list.push_cloned(RoomListEntry::Empty);
missing -= 1;
}
self.rooms_count.replace(Some(rooms_count));
changed = true;
}
if !ops.is_empty() {
self.room_ops(ops)?;
changed = true;
}
if changed {
if let Err(e) = self.rooms_updated_signal.send(()) {
tracing::warn!("Could not inform about rooms updated: {:?}", e);
}
}
Ok(changed)
}
fn request_generator(&self) -> SlidingSyncViewRequestGenerator<'_> {
match self.sync_mode.read_only().get_cloned() {
SlidingSyncMode::FullSync => SlidingSyncViewRequestGenerator::new_with_syncup(self),
SlidingSyncMode::Selective => SlidingSyncViewRequestGenerator::new_live(self),
}
}
}
impl Client {
pub async fn sliding_sync(&self) -> SlidingSyncBuilder {
let _ = self.server_versions().await;
SlidingSyncBuilder::default().client(self.clone())
}
#[tracing::instrument(skip(self, response))]
pub(crate) async fn process_sliding_sync(
&self,
response: v4::Response,
) -> Result<SyncResponse> {
let response = self.base_client().process_sliding_sync(response).await?;
tracing::info!("done processing on base_client");
self.handle_sync_response(response).await
}
}