use std::collections::HashMap;
use std::sync::Arc;
use livekit::id::{ParticipantIdentity, TrackSid};
use smallvec::SmallVec;
use tracing::{debug, info};
use crate::protocol::v2::server::advertise;
use crate::remote_access::channel_subscription::ChannelSubscription;
use crate::remote_access::participant::Participant;
use crate::remote_access::session::{VideoInputSchema, VideoMetadata, VideoPublisher};
use crate::{ChannelId, RawChannel};
pub(crate) struct RemovedSubscriptions {
pub last_unsubscribed: SmallVec<[ChannelId; 4]>,
pub last_video_unsubscribed: SmallVec<[ChannelId; 4]>,
}
pub(crate) struct SessionState {
participants: HashMap<ParticipantIdentity, Arc<Participant>>,
channels: HashMap<ChannelId, Arc<RawChannel>>,
subscriptions: HashMap<ChannelId, SmallVec<[ParticipantIdentity; 1]>>,
data_subscriptions: HashMap<ChannelId, ChannelSubscription>,
video_subscribers: HashMap<ChannelId, SmallVec<[ParticipantIdentity; 1]>>,
video_schemas: HashMap<ChannelId, VideoInputSchema>,
video_publishers: HashMap<ChannelId, Arc<VideoPublisher>>,
video_track_sids: HashMap<ChannelId, TrackSid>,
video_metadata: HashMap<ChannelId, VideoMetadata>,
}
impl SessionState {
pub fn new() -> Self {
Self {
participants: HashMap::new(),
channels: HashMap::new(),
subscriptions: HashMap::new(),
data_subscriptions: HashMap::new(),
video_subscribers: HashMap::new(),
video_schemas: HashMap::new(),
video_publishers: HashMap::new(),
video_track_sids: HashMap::new(),
video_metadata: HashMap::new(),
}
}
pub fn insert_participant(
&mut self,
identity: ParticipantIdentity,
participant: Arc<Participant>,
) -> Arc<Participant> {
use std::collections::hash_map::Entry;
match self.participants.entry(identity) {
Entry::Occupied(e) => e.get().clone(),
Entry::Vacant(v) => {
v.insert(participant.clone());
participant
}
}
}
#[must_use]
pub fn remove_participant(&mut self, identity: &ParticipantIdentity) -> RemovedSubscriptions {
if self.participants.remove(identity).is_none() {
return RemovedSubscriptions {
last_unsubscribed: SmallVec::new(),
last_video_unsubscribed: SmallVec::new(),
};
}
info!("removed participant {identity:?}");
let mut last_unsubscribed: SmallVec<[ChannelId; 4]> = SmallVec::new();
for (&channel_id, subscribers) in &mut self.subscriptions {
if let Some(pos) = subscribers.iter().position(|id| id == identity) {
subscribers.swap_remove(pos);
if subscribers.is_empty() {
last_unsubscribed.push(channel_id);
}
}
}
for sub in self.data_subscriptions.values_mut() {
sub.remove(identity);
}
let mut last_video_unsubscribed: SmallVec<[ChannelId; 4]> = SmallVec::new();
self.video_subscribers.retain(|&channel_id, subscribers| {
subscribers.retain(|id| id != identity);
if subscribers.is_empty() {
last_video_unsubscribed.push(channel_id);
false
} else {
true
}
});
RemovedSubscriptions {
last_unsubscribed,
last_video_unsubscribed,
}
}
pub fn get_participant(&self, identity: &ParticipantIdentity) -> Option<Arc<Participant>> {
self.participants.get(identity).cloned()
}
pub fn collect_participants(&self) -> SmallVec<[Arc<Participant>; 8]> {
self.participants.values().cloned().collect()
}
pub fn insert_channel(&mut self, channel: &Arc<RawChannel>) {
self.channels.insert(channel.id(), channel.clone());
}
pub fn has_channel(&self, channel_id: &ChannelId) -> bool {
self.channels.contains_key(channel_id)
}
pub fn remove_channel(&mut self, channel_id: ChannelId) -> bool {
self.subscriptions.remove(&channel_id);
self.data_subscriptions.remove(&channel_id);
self.video_subscribers.remove(&channel_id);
self.video_metadata.remove(&channel_id);
self.channels.remove(&channel_id).is_some()
}
pub fn with_channels<R>(
&self,
f: impl FnOnce(&HashMap<ChannelId, Arc<RawChannel>>) -> R,
) -> Option<R> {
if self.channels.is_empty() {
return None;
}
Some(f(&self.channels))
}
pub fn insert_video_schema(&mut self, channel_id: ChannelId, schema: VideoInputSchema) {
self.video_schemas.insert(channel_id, schema);
}
pub fn get_video_schema(&self, channel_id: &ChannelId) -> Option<VideoInputSchema> {
self.video_schemas.get(channel_id).copied()
}
pub fn remove_video_schema(&mut self, channel_id: &ChannelId) {
self.video_schemas.remove(channel_id);
self.video_metadata.remove(channel_id);
}
pub fn insert_video_publisher(
&mut self,
channel_id: ChannelId,
publisher: Arc<VideoPublisher>,
) {
self.video_publishers.insert(channel_id, publisher);
}
pub fn get_video_publisher(&self, channel_id: &ChannelId) -> Option<Arc<VideoPublisher>> {
self.video_publishers.get(channel_id).cloned()
}
pub fn remove_video_publisher(&mut self, channel_id: &ChannelId) {
self.video_publishers.remove(channel_id);
}
pub fn insert_video_track_sid(&mut self, channel_id: ChannelId, sid: TrackSid) {
self.video_track_sids.insert(channel_id, sid);
}
pub fn remove_video_track_sid(&mut self, channel_id: &ChannelId) -> Option<TrackSid> {
self.video_track_sids.remove(channel_id)
}
pub fn iter_video_publishers(
&self,
) -> impl Iterator<Item = (&ChannelId, &Arc<VideoPublisher>)> {
self.video_publishers.iter()
}
pub fn insert_video_metadata(&mut self, channel_id: ChannelId, metadata: VideoMetadata) {
self.video_metadata.insert(channel_id, metadata);
}
#[cfg(test)]
pub fn remove_video_metadata(&mut self, channel_id: &ChannelId) {
self.video_metadata.remove(channel_id);
}
pub fn add_metadata_to_advertisement(&self, advertise: &mut advertise::Advertise<'_>) {
for ch in &mut advertise.channels {
let channel_id = ChannelId::new(ch.id);
if self.video_schemas.contains_key(&channel_id) {
ch.metadata
.insert("foxglove.hasVideoTrack".to_string(), "true".to_string());
}
if let Some(meta) = self.video_metadata.get(&channel_id) {
ch.metadata.insert(
"foxglove.videoSourceEncoding".to_string(),
meta.encoding.as_str().to_string(),
);
if !meta.frame_id.is_empty() {
ch.metadata
.insert("foxglove.videoFrameId".to_string(), meta.frame_id.clone());
}
}
}
}
#[must_use]
pub fn subscribe(
&mut self,
participant: &Participant,
channel_ids: &[ChannelId],
) -> SmallVec<[ChannelId; 4]> {
let mut first_subscribed: SmallVec<[ChannelId; 4]> = SmallVec::new();
for &channel_id in channel_ids {
let subscribers = self.subscriptions.entry(channel_id).or_default();
if subscribers.contains(participant.identity()) {
info!("{participant} is already subscribed to channel {channel_id:?}; ignoring");
continue;
}
let is_first = subscribers.is_empty();
subscribers.push(participant.identity().clone());
debug!("{participant} subscribed to channel {channel_id:?}");
if is_first {
first_subscribed.push(channel_id);
}
}
first_subscribed
}
#[must_use]
pub fn unsubscribe(
&mut self,
participant: &Participant,
channel_ids: &[ChannelId],
) -> SmallVec<[ChannelId; 4]> {
let mut last_unsubscribed: SmallVec<[ChannelId; 4]> = SmallVec::new();
for &channel_id in channel_ids {
let Some(subscribers) = self.subscriptions.get_mut(&channel_id) else {
info!("{participant} is not subscribed to channel {channel_id:?}; ignoring");
continue;
};
let Some(pos) = subscribers
.iter()
.position(|id| id == participant.identity())
else {
info!("{participant} is not subscribed to channel {channel_id:?}; ignoring");
continue;
};
subscribers.swap_remove(pos);
debug!("{participant} unsubscribed from channel {channel_id:?}");
if subscribers.is_empty() {
last_unsubscribed.push(channel_id);
}
}
last_unsubscribed
}
pub fn subscribe_data(&mut self, participant: &Participant, channel_ids: &[ChannelId]) {
for &channel_id in channel_ids {
let sub = self
.data_subscriptions
.entry(channel_id)
.or_insert_with(ChannelSubscription::new);
if sub.subscribers().contains(participant.identity()) {
continue;
}
sub.add(participant.identity().clone());
}
}
pub fn unsubscribe_data(&mut self, participant: &Participant, channel_ids: &[ChannelId]) {
for &channel_id in channel_ids {
let Some(sub) = self.data_subscriptions.get_mut(&channel_id) else {
continue;
};
if !sub.remove(participant.identity()) {
continue;
}
}
}
#[must_use]
pub fn subscribe_video(
&mut self,
participant: &Participant,
channel_ids: &[ChannelId],
) -> SmallVec<[ChannelId; 4]> {
let mut first_subscribed: SmallVec<[ChannelId; 4]> = SmallVec::new();
for &channel_id in channel_ids {
let subscribers = self.video_subscribers.entry(channel_id).or_default();
if subscribers.contains(participant.identity()) {
continue;
}
let is_first = subscribers.is_empty();
subscribers.push(participant.identity().clone());
if is_first {
first_subscribed.push(channel_id);
}
}
first_subscribed
}
#[must_use]
pub fn unsubscribe_video(
&mut self,
participant: &Participant,
channel_ids: &[ChannelId],
) -> SmallVec<[ChannelId; 4]> {
let mut last_unsubscribed: SmallVec<[ChannelId; 4]> = SmallVec::new();
for &channel_id in channel_ids {
let Some(subscribers) = self.video_subscribers.get_mut(&channel_id) else {
continue;
};
let Some(pos) = subscribers
.iter()
.position(|id| id == participant.identity())
else {
continue;
};
subscribers.swap_remove(pos);
if subscribers.is_empty() {
self.video_subscribers.remove(&channel_id);
last_unsubscribed.push(channel_id);
}
}
last_unsubscribed
}
pub fn has_data_subscribers(&self, channel_id: &ChannelId) -> bool {
self.get_data_subscription(channel_id).is_some()
}
pub fn get_data_subscription(&self, channel_id: &ChannelId) -> Option<&ChannelSubscription> {
let sub = self.data_subscriptions.get(channel_id)?;
if sub.is_empty() { None } else { Some(sub) }
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::img2yuv::{ImageEncoding, RawImageEncoding};
use crate::remote_access::participant::ParticipantWriter;
fn make_participant(name: &str) -> (ParticipantIdentity, Arc<Participant>) {
let identity = ParticipantIdentity(name.to_string());
let writer = Arc::new(crate::remote_access::participant::TestByteStreamWriter::default());
let participant = Arc::new(Participant::new(
identity.clone(),
ParticipantWriter::Test(writer),
));
(identity, participant)
}
fn make_channel(topic: &str) -> Arc<RawChannel> {
use crate::{ChannelBuilder, Context, Schema};
let ctx = Context::new();
ChannelBuilder::new(topic)
.context(&ctx)
.message_encoding("json")
.schema(Schema::new("S", "jsonschema", b"{}"))
.build_raw()
.unwrap()
}
#[test]
fn insert_new_participant() {
let mut state = SessionState::new();
let (id, p) = make_participant("alice");
let stored = state.insert_participant(id.clone(), p);
assert_eq!(stored.identity(), &id);
assert!(Arc::ptr_eq(&stored, &state.get_participant(&id).unwrap()));
}
#[test]
fn insert_duplicate_returns_existing() {
let mut state = SessionState::new();
let (id, p1) = make_participant("alice");
let stored1 = state.insert_participant(id.clone(), p1);
let (_id2, p2) = make_participant("bob");
let stored2 = state.insert_participant(id, p2);
assert!(Arc::ptr_eq(&stored1, &stored2));
}
#[test]
fn get_participant_returns_existing() {
let mut state = SessionState::new();
let (id, p) = make_participant("alice");
state.insert_participant(id.clone(), p);
assert!(state.get_participant(&id).is_some());
}
#[test]
fn get_participant_returns_none_for_missing() {
let state = SessionState::new();
let id = ParticipantIdentity("nobody".to_string());
assert!(state.get_participant(&id).is_none());
}
#[test]
fn remove_missing_participant_is_noop() {
let mut state = SessionState::new();
let id = ParticipantIdentity("nobody".to_string());
let removed = state.remove_participant(&id);
assert!(removed.last_unsubscribed.is_empty());
assert!(removed.last_video_unsubscribed.is_empty());
}
#[test]
fn remove_participant_cleans_up_subscriptions() {
let mut state = SessionState::new();
let (id, p) = make_participant("alice");
state.insert_participant(id.clone(), p.clone());
let ch = ChannelId::new(1);
let _ = state.subscribe(&p, &[ch]);
state.subscribe_data(&p, &[ch]);
let removed = state.remove_participant(&id);
assert_eq!(removed.last_unsubscribed.as_slice(), &[ch]);
assert!(!state.has_data_subscribers(&ch));
}
#[test]
fn remove_participant_reports_only_last_unsubscribed_channels() {
let mut state = SessionState::new();
let (id_a, pa) = make_participant("alice");
let (id_b, pb) = make_participant("bob");
state.insert_participant(id_a.clone(), pa.clone());
state.insert_participant(id_b.clone(), pb.clone());
let ch1 = ChannelId::new(10);
let ch2 = ChannelId::new(20);
let _ = state.subscribe(&pa, &[ch1, ch2]);
state.subscribe_data(&pa, &[ch1, ch2]);
let _ = state.subscribe(&pb, &[ch1]);
state.subscribe_data(&pb, &[ch1]);
let removed = state.remove_participant(&id_a);
assert_eq!(removed.last_unsubscribed.as_slice(), &[ch2]);
assert_eq!(state.subscriptions[&ch1].len(), 1);
}
#[test]
fn remove_participant_cleans_up_video_subscriptions() {
let mut state = SessionState::new();
let (id, p) = make_participant("alice");
state.insert_participant(id.clone(), p.clone());
let ch = ChannelId::new(1);
let _ = state.subscribe(&p, &[ch]);
let _ = state.subscribe_video(&p, &[ch]);
let removed = state.remove_participant(&id);
assert_eq!(removed.last_unsubscribed.as_slice(), &[ch]);
assert_eq!(removed.last_video_unsubscribed.as_slice(), &[ch]);
}
#[test]
fn insert_and_query_channel() {
let mut state = SessionState::new();
let ch = make_channel("/topic1");
state.insert_channel(&ch);
assert_eq!(state.channels.len(), 1);
}
#[test]
fn remove_channel_returns_true_when_present() {
let mut state = SessionState::new();
let ch = make_channel("/topic1");
state.insert_channel(&ch);
assert!(state.remove_channel(ch.id()));
}
#[test]
fn remove_channel_returns_false_when_absent() {
let mut state = SessionState::new();
assert!(!state.remove_channel(ChannelId::new(999)));
}
#[test]
fn first_subscriber_is_reported() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch = ChannelId::new(1);
let first = state.subscribe(&p, &[ch]);
assert_eq!(first.as_slice(), &[ch]);
}
#[test]
fn second_subscriber_is_not_reported_as_first() {
let mut state = SessionState::new();
let (_id_a, pa) = make_participant("alice");
let (_id_b, pb) = make_participant("bob");
let ch = ChannelId::new(1);
let _ = state.subscribe(&pa, &[ch]);
let first = state.subscribe(&pb, &[ch]);
assert!(first.is_empty());
}
#[test]
fn duplicate_subscribe_is_idempotent() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch = ChannelId::new(1);
let _ = state.subscribe(&p, &[ch]);
let first = state.subscribe(&p, &[ch]);
assert!(first.is_empty());
assert_eq!(state.subscriptions[&ch].len(), 1);
}
#[test]
fn subscribe_multiple_channels_at_once() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch1 = ChannelId::new(1);
let ch2 = ChannelId::new(2);
let first = state.subscribe(&p, &[ch1, ch2]);
assert_eq!(first.len(), 2);
assert!(first.contains(&ch1));
assert!(first.contains(&ch2));
}
#[test]
fn last_unsubscriber_is_reported() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch = ChannelId::new(1);
let _ = state.subscribe(&p, &[ch]);
let last = state.unsubscribe(&p, &[ch]);
assert_eq!(last.as_slice(), &[ch]);
}
#[test]
fn unsubscribe_with_remaining_subscribers_is_not_reported() {
let mut state = SessionState::new();
let (_id_a, pa) = make_participant("alice");
let (_id_b, pb) = make_participant("bob");
let ch = ChannelId::new(1);
let _ = state.subscribe(&pa, &[ch]);
let _ = state.subscribe(&pb, &[ch]);
let last = state.unsubscribe(&pa, &[ch]);
assert!(last.is_empty());
assert_eq!(state.subscriptions[&ch].len(), 1);
}
#[test]
fn unsubscribe_when_not_subscribed_is_noop() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch = ChannelId::new(1);
let last = state.unsubscribe(&p, &[ch]);
assert!(last.is_empty());
}
#[test]
fn unsubscribe_multiple_channels_at_once() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch1 = ChannelId::new(1);
let ch2 = ChannelId::new(2);
let _ = state.subscribe(&p, &[ch1, ch2]);
let last = state.unsubscribe(&p, &[ch1, ch2]);
assert_eq!(last.len(), 2);
assert!(last.contains(&ch1));
assert!(last.contains(&ch2));
}
#[test]
fn get_data_subscription_returns_none_for_no_subscriptions() {
let state = SessionState::new();
assert!(state.get_data_subscription(&ChannelId::new(1)).is_none());
}
#[test]
fn get_data_subscription_returns_subscriber_identities() {
let mut state = SessionState::new();
let (id_a, pa) = make_participant("alice");
let (id_b, pb) = make_participant("bob");
let ch = ChannelId::new(1);
state.subscribe_data(&pa, &[ch]);
state.subscribe_data(&pb, &[ch]);
let sub = state.get_data_subscription(&ch).unwrap();
assert_eq!(sub.subscribers().len(), 2);
assert!(sub.subscribers().contains(&id_a));
assert!(sub.subscribers().contains(&id_b));
}
#[test]
fn subscription_version_increments_on_subscribe() {
let mut state = SessionState::new();
let (_id_a, pa) = make_participant("alice");
let (_id_b, pb) = make_participant("bob");
let ch = ChannelId::new(1);
state.subscribe_data(&pa, &[ch]);
let v1 = state.get_data_subscription(&ch).unwrap().version();
state.subscribe_data(&pb, &[ch]);
let v2 = state.get_data_subscription(&ch).unwrap().version();
assert_ne!(v1, v2);
}
#[test]
fn subscription_version_does_not_increment_on_duplicate_subscribe() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch = ChannelId::new(1);
state.subscribe_data(&p, &[ch]);
let v1 = state.get_data_subscription(&ch).unwrap().version();
state.subscribe_data(&p, &[ch]);
let v2 = state.get_data_subscription(&ch).unwrap().version();
assert_eq!(v1, v2);
}
#[test]
fn subscription_version_increments_on_unsubscribe() {
let mut state = SessionState::new();
let (_id_a, pa) = make_participant("alice");
let (_id_b, pb) = make_participant("bob");
let ch = ChannelId::new(1);
state.subscribe_data(&pa, &[ch]);
state.subscribe_data(&pb, &[ch]);
let v1 = state.get_data_subscription(&ch).unwrap().version();
state.unsubscribe_data(&pa, &[ch]);
let v2 = state.get_data_subscription(&ch).unwrap().version();
assert_ne!(v1, v2);
}
#[test]
fn subscription_version_increments_on_remove_participant() {
let mut state = SessionState::new();
let (id_a, pa) = make_participant("alice");
let (id_b, pb) = make_participant("bob");
let ch = ChannelId::new(1);
state.insert_participant(id_a.clone(), pa.clone());
state.insert_participant(id_b, pb.clone());
let _ = state.subscribe(&pa, &[ch]);
let _ = state.subscribe(&pb, &[ch]);
state.subscribe_data(&pa, &[ch]);
state.subscribe_data(&pb, &[ch]);
let v1 = state.get_data_subscription(&ch).unwrap().version();
let _ = state.remove_participant(&id_a);
let v2 = state.get_data_subscription(&ch).unwrap().version();
assert_ne!(v1, v2);
}
#[test]
fn version_preserved_across_unsubscribe_resubscribe() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch = ChannelId::new(1);
state.subscribe_data(&p, &[ch]);
let v1 = state.data_subscriptions.get(&ch).unwrap().version();
state.unsubscribe_data(&p, &[ch]);
let v2 = state.data_subscriptions.get(&ch).unwrap().version();
assert_ne!(v1, v2, "unsubscribe should bump version");
state.subscribe_data(&p, &[ch]);
let v3 = state.data_subscriptions.get(&ch).unwrap().version();
assert_ne!(v2, v3, "resubscribe should bump version");
assert_ne!(v1, v3, "resubscribe version should differ from original");
}
#[test]
fn version_preserved_across_remove_participant_resubscribe() {
let mut state = SessionState::new();
let (id, p) = make_participant("alice");
state.insert_participant(id.clone(), p.clone());
let ch = ChannelId::new(1);
let _ = state.subscribe(&p, &[ch]);
state.subscribe_data(&p, &[ch]);
let v1 = state.data_subscriptions.get(&ch).unwrap().version();
let _ = state.remove_participant(&id);
let v2 = state.data_subscriptions.get(&ch).unwrap().version();
assert_ne!(v1, v2, "remove_participant should bump version");
let (id2, p2) = make_participant("alice");
state.insert_participant(id2, p2.clone());
state.subscribe_data(&p2, &[ch]);
let v3 = state.data_subscriptions.get(&ch).unwrap().version();
assert_ne!(v2, v3, "resubscribe after remove should bump version");
}
#[test]
fn collect_participants_yields_all() {
let mut state = SessionState::new();
let (id_a, pa) = make_participant("alice");
let (id_b, pb) = make_participant("bob");
state.insert_participant(id_a, pa);
state.insert_participant(id_b, pb);
assert_eq!(state.collect_participants().len(), 2);
}
#[test]
fn first_video_subscriber_is_reported() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch = ChannelId::new(1);
let first = state.subscribe_video(&p, &[ch]);
assert_eq!(first.as_slice(), &[ch]);
}
#[test]
fn second_video_subscriber_is_not_reported_as_first() {
let mut state = SessionState::new();
let (_id_a, pa) = make_participant("alice");
let (_id_b, pb) = make_participant("bob");
let ch = ChannelId::new(1);
let _ = state.subscribe_video(&pa, &[ch]);
let first = state.subscribe_video(&pb, &[ch]);
assert!(first.is_empty());
}
#[test]
fn duplicate_video_subscribe_is_idempotent() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch = ChannelId::new(1);
let _ = state.subscribe_video(&p, &[ch]);
let first = state.subscribe_video(&p, &[ch]);
assert!(first.is_empty());
}
#[test]
fn last_video_unsubscriber_is_reported() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch = ChannelId::new(1);
let _ = state.subscribe_video(&p, &[ch]);
let last = state.unsubscribe_video(&p, &[ch]);
assert_eq!(last.as_slice(), &[ch]);
}
#[test]
fn video_unsubscribe_with_remaining_is_not_reported() {
let mut state = SessionState::new();
let (_id_a, pa) = make_participant("alice");
let (_id_b, pb) = make_participant("bob");
let ch = ChannelId::new(1);
let _ = state.subscribe_video(&pa, &[ch]);
let _ = state.subscribe_video(&pb, &[ch]);
let last = state.unsubscribe_video(&pa, &[ch]);
assert!(last.is_empty());
}
#[test]
fn video_unsubscribe_when_not_subscribed_is_noop() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch = ChannelId::new(1);
let last = state.unsubscribe_video(&p, &[ch]);
assert!(last.is_empty());
}
#[test]
fn no_subscribers_means_no_data_subscribers() {
let state = SessionState::new();
let ch = ChannelId::new(1);
assert!(!state.has_data_subscribers(&ch));
assert!(state.get_data_subscription(&ch).is_none());
}
#[test]
fn data_only_subscriber() {
let mut state = SessionState::new();
let (id, p) = make_participant("alice");
let ch = ChannelId::new(1);
state.subscribe_data(&p, &[ch]);
assert!(state.has_data_subscribers(&ch));
let subs = state.get_data_subscription(&ch).unwrap();
assert_eq!(subs.subscribers().len(), 1);
assert!(subs.subscribers().contains(&id));
}
#[test]
fn video_only_subscriber_is_not_a_data_subscriber() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch = ChannelId::new(1);
let _ = state.subscribe_video(&p, &[ch]);
assert!(!state.has_data_subscribers(&ch));
assert!(state.get_data_subscription(&ch).is_none());
}
#[test]
fn mixed_subscribers_data_and_video() {
let mut state = SessionState::new();
let (id_a, pa) = make_participant("alice");
let (id_b, pb) = make_participant("bob");
let ch = ChannelId::new(1);
let _ = state.subscribe_video(&pa, &[ch]);
state.subscribe_data(&pb, &[ch]);
assert!(state.has_data_subscribers(&ch));
let subs = state.get_data_subscription(&ch).unwrap();
assert_eq!(subs.subscribers().len(), 1);
assert!(subs.subscribers().contains(&id_b));
assert!(!subs.subscribers().contains(&id_a));
}
#[test]
fn switching_from_video_to_data() {
let mut state = SessionState::new();
let (id, p) = make_participant("alice");
let ch = ChannelId::new(1);
let _ = state.subscribe_video(&p, &[ch]);
assert!(!state.has_data_subscribers(&ch));
let _ = state.unsubscribe_video(&p, &[ch]);
state.subscribe_data(&p, &[ch]);
assert!(state.has_data_subscribers(&ch));
let subs = state.get_data_subscription(&ch).unwrap();
assert!(subs.subscribers().contains(&id));
}
#[test]
fn switching_from_data_to_video() {
let mut state = SessionState::new();
let (_id, p) = make_participant("alice");
let ch = ChannelId::new(1);
state.subscribe_data(&p, &[ch]);
assert!(state.has_data_subscribers(&ch));
state.unsubscribe_data(&p, &[ch]);
let _ = state.subscribe_video(&p, &[ch]);
assert!(!state.has_data_subscribers(&ch));
assert!(state.video_subscribers.contains_key(&ch));
}
#[test]
fn remove_participant_with_mixed_video_preferences() {
let mut state = SessionState::new();
let (id_a, pa) = make_participant("alice");
let (id_b, pb) = make_participant("bob");
state.insert_participant(id_a.clone(), pa.clone());
state.insert_participant(id_b.clone(), pb.clone());
let ch = ChannelId::new(1);
let _ = state.subscribe(&pa, &[ch]);
let _ = state.subscribe(&pb, &[ch]);
let _ = state.subscribe_video(&pa, &[ch]);
state.subscribe_data(&pb, &[ch]);
let removed = state.remove_participant(&id_a);
assert!(removed.last_unsubscribed.is_empty(), "bob still subscribed");
assert_eq!(removed.last_video_unsubscribed.as_slice(), &[ch]);
assert!(state.has_data_subscribers(&ch));
}
#[test]
fn add_metadata_to_advertisement_injects_video_metadata() {
let mut state = SessionState::new();
let ch = make_channel("/camera");
state.insert_channel(&ch);
state.insert_video_schema(ch.id(), VideoInputSchema::FoxgloveRawImage);
let mut msg = advertise::advertise_channels(std::iter::once(&ch)).into_owned();
state.add_metadata_to_advertisement(&mut msg);
assert_eq!(msg.channels.len(), 1);
assert_eq!(
msg.channels[0].metadata.get("foxglove.hasVideoTrack"),
Some(&"true".to_string()),
);
assert!(
!msg.channels[0]
.metadata
.contains_key("foxglove.videoSourceEncoding")
);
assert!(
!msg.channels[0]
.metadata
.contains_key("foxglove.videoFrameId")
);
state.insert_video_metadata(
ch.id(),
VideoMetadata {
encoding: ImageEncoding::Raw(RawImageEncoding::Rgb8),
frame_id: "camera_optical_frame".to_string(),
},
);
let mut msg = advertise::advertise_channels(std::iter::once(&ch)).into_owned();
state.add_metadata_to_advertisement(&mut msg);
assert_eq!(
msg.channels[0].metadata.get("foxglove.videoSourceEncoding"),
Some(&"rgb8".to_string()),
);
assert_eq!(
msg.channels[0].metadata.get("foxglove.videoFrameId"),
Some(&"camera_optical_frame".to_string()),
);
}
#[test]
fn add_metadata_to_advertisement_omits_empty_frame_id() {
let mut state = SessionState::new();
let ch = make_channel("/camera");
state.insert_channel(&ch);
state.insert_video_schema(ch.id(), VideoInputSchema::FoxgloveRawImage);
state.insert_video_metadata(
ch.id(),
VideoMetadata {
encoding: ImageEncoding::Raw(RawImageEncoding::Mono8),
frame_id: String::new(),
},
);
let mut msg = advertise::advertise_channels(std::iter::once(&ch)).into_owned();
state.add_metadata_to_advertisement(&mut msg);
assert_eq!(
msg.channels[0].metadata.get("foxglove.videoSourceEncoding"),
Some(&"mono8".to_string()),
);
assert!(
!msg.channels[0]
.metadata
.contains_key("foxglove.videoFrameId"),
"empty frame_id should not be advertised"
);
}
#[test]
fn remove_video_metadata_clears_from_advertisement() {
let mut state = SessionState::new();
let ch = make_channel("/camera");
state.insert_channel(&ch);
state.insert_video_schema(ch.id(), VideoInputSchema::FoxgloveRawImage);
state.insert_video_metadata(
ch.id(),
VideoMetadata {
encoding: ImageEncoding::Raw(RawImageEncoding::Rgb8),
frame_id: "frame".to_string(),
},
);
state.remove_video_metadata(&ch.id());
let mut msg = advertise::advertise_channels(std::iter::once(&ch)).into_owned();
state.add_metadata_to_advertisement(&mut msg);
assert_eq!(
msg.channels[0].metadata.get("foxglove.hasVideoTrack"),
Some(&"true".to_string()),
);
assert!(
!msg.channels[0]
.metadata
.contains_key("foxglove.videoSourceEncoding")
);
assert!(
!msg.channels[0]
.metadata
.contains_key("foxglove.videoFrameId")
);
}
#[test]
fn remove_participant_video_subscriber_while_other_video_remains() {
let mut state = SessionState::new();
let (id_a, pa) = make_participant("alice");
let (id_b, pb) = make_participant("bob");
state.insert_participant(id_a.clone(), pa.clone());
state.insert_participant(id_b.clone(), pb.clone());
let ch = ChannelId::new(1);
let _ = state.subscribe(&pa, &[ch]);
let _ = state.subscribe(&pb, &[ch]);
let _ = state.subscribe_video(&pa, &[ch]);
let _ = state.subscribe_video(&pb, &[ch]);
let removed = state.remove_participant(&id_a);
assert!(removed.last_unsubscribed.is_empty());
assert!(removed.last_video_unsubscribed.is_empty());
assert!(!state.has_data_subscribers(&ch));
}
}