use std::sync::Arc;
use eyeball_im::{ObservableVector, VectorSubscriberBatchedStream};
use imbl::Vector;
use matrix_sdk_base::{deserialized_responses::SyncOrStrippedState, event_cache::Event};
use matrix_sdk_common::locks::Mutex;
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId,
events::{
AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncStateEvent,
beacon::OriginalSyncBeaconEvent,
beacon_info::{BeaconInfoEventContent, OriginalSyncBeaconInfoEvent},
location::LocationContent,
relation::RelationType,
},
};
use super::Room;
use crate::event_handler::EventHandlerDropGuard;
#[derive(Clone, Debug)]
pub struct LastLocation {
pub location: LocationContent,
pub ts: MilliSecondsSinceUnixEpoch,
}
#[derive(Clone, Debug)]
pub struct LiveLocationShare {
pub user_id: OwnedUserId,
pub last_location: Option<LastLocation>,
pub beacon_id: OwnedEventId,
pub beacon_info: BeaconInfoEventContent,
}
#[derive(Clone, Debug)]
pub struct BeaconInfoUpdate {
pub room_id: OwnedRoomId,
pub event_id: OwnedEventId,
pub content: BeaconInfoEventContent,
}
#[derive(Debug)]
pub struct LiveLocationsObserver {
shares: Arc<Mutex<ObservableVector<LiveLocationShare>>>,
_beacon_guard: EventHandlerDropGuard,
_beacon_info_guard: EventHandlerDropGuard,
}
impl LiveLocationsObserver {
pub(super) async fn new(room: Room) -> Self {
let mut shares = ObservableVector::new();
let initial_shares =
Self::get_initial_live_location_shares(&room).await.unwrap_or_default();
shares.append(initial_shares);
let shares = Arc::new(Mutex::new(shares));
let beacon_handle = room.add_event_handler({
let shares = shares.clone();
async move |event: OriginalSyncBeaconEvent| {
Self::handle_beacon_event(&shares, event);
}
});
let beacon_guard = room.client.event_handler_drop_guard(beacon_handle);
let beacon_info_handle = room.add_event_handler({
let shares = shares.clone();
async move |event: OriginalSyncBeaconInfoEvent, room: Room| {
Self::handle_beacon_info_event(&shares, &room, event).await;
}
});
let beacon_info_guard = room.client.event_handler_drop_guard(beacon_info_handle);
Self { shares, _beacon_guard: beacon_guard, _beacon_info_guard: beacon_info_guard }
}
pub fn subscribe(
&self,
) -> (Vector<LiveLocationShare>, VectorSubscriberBatchedStream<LiveLocationShare>) {
self.shares.lock().subscribe().into_values_and_batched_stream()
}
async fn get_initial_live_location_shares(
room: &Room,
) -> crate::Result<Vector<LiveLocationShare>> {
let beacon_infos = room.get_state_events_static::<BeaconInfoEventContent>().await?;
let event_cache = room.event_cache().await.ok();
let mut shares = Vector::new();
for raw_beacon_info in beacon_infos {
let Ok(event) = raw_beacon_info.deserialize() else { continue };
let Some((user_id, beacon_info, event_id)) = Self::extract_live_beacon_info(event)
else {
continue;
};
let last_location = match &event_cache {
Some((cache, _drop_handles)) => Self::find_last_location(cache, &event_id).await,
None => None,
};
shares.push_back(LiveLocationShare {
user_id,
beacon_info,
beacon_id: event_id,
last_location,
});
}
Ok(shares)
}
fn extract_live_beacon_info(
event: SyncOrStrippedState<BeaconInfoEventContent>,
) -> Option<(OwnedUserId, BeaconInfoEventContent, OwnedEventId)> {
let SyncOrStrippedState::Sync(SyncStateEvent::Original(ev)) = event else {
return None;
};
if !ev.content.is_live() {
return None;
}
Some((ev.state_key, ev.content, ev.event_id))
}
async fn find_last_location(
cache: &crate::event_cache::RoomEventCache,
beacon_info_event_id: &OwnedEventId,
) -> Option<LastLocation> {
cache
.find_event_relations(beacon_info_event_id, Some(vec![RelationType::Reference]))
.await
.ok()?
.into_iter()
.rev()
.find_map(|e| Self::event_to_last_location(&e))
}
fn event_to_last_location(event: &Event) -> Option<LastLocation> {
if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::Beacon(
beacon_event,
))) = event.kind.raw().deserialize()
{
beacon_event.as_original().map(|beacon| LastLocation {
location: beacon.content.location.clone(),
ts: beacon.origin_server_ts,
})
} else {
None
}
}
fn handle_beacon_event(
shares: &Mutex<ObservableVector<LiveLocationShare>>,
event: OriginalSyncBeaconEvent,
) {
let beacon_info_event_id = &event.content.relates_to.event_id;
let mut shares = shares.lock();
if let Some(idx) = shares.iter().position(|s| s.beacon_id == *beacon_info_event_id) {
let mut share = shares[idx].clone();
if !share.beacon_info.is_live() {
shares.remove(idx);
return;
}
let last_location =
LastLocation { location: event.content.location, ts: event.origin_server_ts };
share.last_location = Some(last_location);
shares.set(idx, share);
}
}
async fn handle_beacon_info_event(
shares: &Mutex<ObservableVector<LiveLocationShare>>,
room: &Room,
event: OriginalSyncBeaconInfoEvent,
) {
{
let mut shares = shares.lock();
if let Some(idx) = shares.iter().position(|s| s.user_id == *event.state_key) {
shares.remove(idx);
}
}
if event.content.is_live() {
let last_location = if let Ok((cache, _drop_handles)) = room.event_cache().await {
Self::find_last_location(&cache, &event.event_id).await
} else {
None
};
let share = LiveLocationShare {
user_id: event.state_key,
beacon_id: event.event_id,
beacon_info: event.content,
last_location,
};
shares.lock().push_back(share);
}
}
}