use std::{
collections::HashMap,
sync::{
Arc, OnceLock, Weak,
atomic::{AtomicU8, Ordering},
},
time::Duration,
};
use indexmap::IndexSet;
use livekit::{Room, RoomEvent, RoomOptions};
use tokio::{runtime::Handle, sync::OnceCell, sync::mpsc::UnboundedReceiver, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use crate::{
Context, FoxgloveError, SinkChannelFilter, SinkId,
api_client::{
DeviceResponse, DeviceToken, FoxgloveApiClient, FoxgloveApiClientBuilder, WatchQuery,
WatchWakeEvent,
},
library_version::get_library_version,
protocol::v2::{parameter::Parameter, server::ServerInfo},
remote_access::{
AssetHandler, Capability, RemoteAccessError,
protocol_version::{self, REMOTE_ACCESS_PROTOCOL_VERSION},
qos::QosClassifier,
session::{RemoteAccessSession, SessionParams},
watch::Watch,
watch_loop::{
ConnectAction, INITIAL_BACKOFF, MAX_BACKOFF, WatchAction, WatchRetryState,
on_connect_error, on_connect_success, on_outcome,
},
},
remote_common::{
connection_graph::ConnectionGraph,
parameters::ParameterHandler,
service::{Service, ServiceId, ServiceMap},
},
};
type Result<T> = std::result::Result<T, Box<RemoteAccessError>>;
use super::session::DEFAULT_MESSAGE_BACKLOG_SIZE;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum ConnectionStatus {
Connecting = 0,
Connected = 1,
ShuttingDown = 2,
Shutdown = 3,
}
impl std::fmt::Display for ConnectionStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
ConnectionStatus::Connecting => "connecting",
ConnectionStatus::Connected => "connected",
ConnectionStatus::ShuttingDown => "shutting down",
ConnectionStatus::Shutdown => "shutdown",
})
}
}
impl ConnectionStatus {
fn from_u8(value: u8) -> Self {
match value {
0 => Self::Connecting,
1 => Self::Connected,
2 => Self::ShuttingDown,
3 => Self::Shutdown,
_ => unreachable!("invalid ConnectionStatus value: {value}"),
}
}
}
pub(super) struct ConnectionParams {
pub(super) name: Option<String>,
pub(super) device_token: String,
pub(super) foxglove_api_url: Option<String>,
pub(super) foxglove_api_timeout: Option<Duration>,
pub(super) listener: Option<Arc<dyn super::Listener>>,
pub(super) capabilities: Vec<Capability>,
pub(super) supported_encodings: Option<IndexSet<String>>,
pub(super) fetch_asset_handler: Option<Arc<dyn AssetHandler>>,
pub(super) parameter_handler: Option<Arc<dyn ParameterHandler>>,
pub(super) runtime: Handle,
pub(super) channel_filter: Option<Arc<dyn SinkChannelFilter>>,
pub(super) qos_classifier: Option<Arc<dyn QosClassifier>>,
pub(super) server_info: Option<HashMap<String, String>>,
pub(super) message_backlog_size: Option<usize>,
pub(super) context: Weak<Context>,
}
struct DeviceContext {
device: DeviceResponse,
client: Arc<FoxgloveApiClient<DeviceToken>>,
}
#[derive(Debug)]
struct WakeSignal {
wake: WatchWakeEvent,
device_wait_for_viewer: Duration,
}
pub(super) struct RemoteAccessConnection {
name: Option<String>,
device_token: String,
foxglove_api_url: Option<String>,
foxglove_api_timeout: Option<Duration>,
listener: Option<Arc<dyn super::Listener>>,
capabilities: Vec<Capability>,
supported_encodings: Option<IndexSet<String>>,
fetch_asset_handler: Option<Arc<dyn AssetHandler>>,
parameter_handler: Option<Arc<dyn ParameterHandler>>,
runtime: Handle,
channel_filter: Option<Arc<dyn SinkChannelFilter>>,
qos_classifier: Option<Arc<dyn QosClassifier>>,
server_info: Option<HashMap<String, String>>,
message_backlog_size: Option<usize>,
context: Weak<Context>,
cancellation_token: CancellationToken,
services: Arc<parking_lot::RwLock<ServiceMap>>,
connection_graph: Arc<parking_lot::Mutex<ConnectionGraph>>,
session: parking_lot::Mutex<Option<Arc<RemoteAccessSession>>>,
device_context: OnceCell<DeviceContext>,
status: AtomicU8,
remote_access_session_id: OnceLock<String>,
}
impl RemoteAccessConnection {
pub fn new(params: ConnectionParams, services: Arc<parking_lot::RwLock<ServiceMap>>) -> Self {
Self {
name: params.name,
device_token: params.device_token,
foxglove_api_url: params.foxglove_api_url,
foxglove_api_timeout: params.foxglove_api_timeout,
listener: params.listener,
capabilities: params.capabilities,
supported_encodings: params.supported_encodings,
fetch_asset_handler: params.fetch_asset_handler,
parameter_handler: params.parameter_handler,
runtime: params.runtime,
channel_filter: params.channel_filter,
qos_classifier: params.qos_classifier,
server_info: params.server_info,
message_backlog_size: params.message_backlog_size,
context: params.context,
cancellation_token: CancellationToken::new(),
services,
connection_graph: Arc::new(parking_lot::Mutex::new(ConnectionGraph::new())),
session: parking_lot::Mutex::new(None),
device_context: OnceCell::new(),
status: AtomicU8::new(ConnectionStatus::Connecting as u8),
remote_access_session_id: OnceLock::new(),
}
}
pub fn status(&self) -> ConnectionStatus {
ConnectionStatus::from_u8(self.status.load(Ordering::Relaxed))
}
pub fn sink_id(&self) -> Option<SinkId> {
self.session.lock().as_ref().map(|s| s.sink_id())
}
pub fn publish_parameter_values(&self, parameters: Vec<Parameter>) {
if let Some(session) = self.session.lock().clone() {
session.publish_parameter_values(parameters);
}
}
pub fn publish_status(&self, status: super::Status) {
if let Some(session) = self.session.lock().clone() {
session.publish_status(status);
}
}
pub fn remove_status(&self, status_ids: Vec<String>) {
if let Some(session) = self.session.lock().clone() {
session.remove_status(status_ids);
}
}
pub fn replace_connection_graph(
&self,
replacement_graph: ConnectionGraph,
) -> std::result::Result<(), FoxgloveError> {
if !self.has_capability(Capability::ConnectionGraph) {
return Err(FoxgloveError::ConnectionGraphNotSupported);
}
if let Some(session) = self.session.lock().clone() {
session.replace_connection_graph(replacement_graph);
} else {
self.connection_graph.lock().update(replacement_graph);
}
Ok(())
}
fn set_status(&self, status: ConnectionStatus) {
let prev = self.status.swap(status as u8, Ordering::Relaxed);
if prev != status as u8
&& let Some(listener) = &self.listener
{
listener.on_connection_status_changed(status);
}
}
fn remote_access_session_id(&self) -> Option<&str> {
self.remote_access_session_id.get().map(String::as_str)
}
async fn get_or_init_device_context(&self) -> Result<&DeviceContext> {
self.device_context
.get_or_try_init(|| async {
let mut builder =
FoxgloveApiClientBuilder::new(DeviceToken::new(self.device_token.clone()));
if let Some(url) = &self.foxglove_api_url {
builder = builder.base_url(url);
}
if let Some(timeout) = self.foxglove_api_timeout {
builder = builder.timeout(timeout);
}
let client = Arc::new(builder.build()?);
let device = client.fetch_device_info().await?;
info!(device_id = %device.id, device_name = %device.name, "device context initialized");
Ok::<_, Box<RemoteAccessError>>(DeviceContext { device, client })
})
.await
}
async fn connect_session(
&self,
wake_signal: WakeSignal,
) -> Result<(Arc<RemoteAccessSession>, UnboundedReceiver<RoomEvent>)> {
let WakeSignal {
wake,
device_wait_for_viewer,
} = wake_signal;
if let Some(session_id) = wake.remote_access_session_id.as_ref() {
let established = self
.remote_access_session_id
.get_or_init(|| session_id.clone());
if established != session_id {
warn!(
remote_access_session_id = established.as_str(),
wake_remote_access_session_id = session_id.as_str(),
"wake remote access session ID differs from established ID; keeping established ID"
);
}
}
let remote_access_session_id = self.remote_access_session_id();
info!(
remote_access_session_id,
url = wake.url.as_str(),
"connecting to room"
);
let (room, room_events) =
Room::connect(&wake.url, &wake.token, RoomOptions::default()).await?;
info!(remote_access_session_id, "connected to room");
let server_info = self.create_server_info(remote_access_session_id.unwrap_or(""));
let session_params = SessionParams {
room,
context: self.context.clone(),
channel_filter: self.channel_filter.clone(),
qos_classifier: self.qos_classifier.clone(),
listener: self.listener.clone(),
capabilities: self.capabilities.clone(),
supported_encodings: self.supported_encodings.clone().unwrap_or_default(),
runtime: self.runtime.clone(),
cancellation_token: self.cancellation_token.child_token(),
message_backlog_size: self
.message_backlog_size
.unwrap_or(DEFAULT_MESSAGE_BACKLOG_SIZE),
services: self.services.clone(),
connection_graph: self.connection_graph.clone(),
remote_access_session_id: remote_access_session_id.map(str::to_string),
fetch_asset_handler: self.fetch_asset_handler.clone(),
parameter_handler: self.parameter_handler.clone(),
server_info,
device_wait_for_viewer: Some(device_wait_for_viewer),
};
Ok((RemoteAccessSession::new(session_params), room_events))
}
pub fn spawn_run_until_cancelled(self: Arc<Self>) -> JoinHandle<()> {
self.runtime.clone().spawn(self.run_until_cancelled())
}
async fn run_until_cancelled(self: Arc<Self>) {
if let Some(listener) = &self.listener {
listener.on_connection_status_changed(ConnectionStatus::Connecting);
}
while !self.cancellation_token.is_cancelled() {
self.run().await;
}
self.set_status(ConnectionStatus::ShuttingDown);
self.set_status(ConnectionStatus::Shutdown);
}
async fn run(&self) {
let Some(wake_signal) = self.watch_until_wake().await else {
return;
};
self.run_session(wake_signal).await;
}
async fn watch_until_wake(&self) -> Option<WakeSignal> {
tokio::select! {
biased;
() = self.cancellation_token.cancelled() => None,
result = self.watch_until_wake_inner() => result,
}
}
async fn watch_until_wake_inner(&self) -> Option<WakeSignal> {
let device_context = self.device_context_until_ok().await?;
let mut retry = WatchRetryState::new();
loop {
let watch = self.connect_watch(device_context, &mut retry).await?;
let watch_lease_id = watch.lease_id().to_string();
let device_wait_for_viewer = watch.device_wait_for_viewer();
let heartbeat_interval = watch.heartbeat_interval();
self.set_status(ConnectionStatus::Connected);
let (outcome, watch_duration) = watch.run().await;
match on_outcome(
outcome,
watch_lease_id,
watch_duration,
heartbeat_interval,
&mut retry,
) {
WatchAction::Wake(wake) => {
return Some(WakeSignal {
wake,
device_wait_for_viewer,
});
}
WatchAction::Reconnect => {
}
WatchAction::Backoff { delay } => {
self.set_status(ConnectionStatus::Connecting);
tokio::time::sleep(delay).await;
}
WatchAction::StopUnauthorized => {
self.cancellation_token.cancel();
return None;
}
WatchAction::Stop => return None,
}
}
}
async fn device_context_until_ok(&self) -> Option<&DeviceContext> {
let mut backoff = INITIAL_BACKOFF;
let device_context = loop {
match self.get_or_init_device_context().await {
Ok(ctx) => break ctx,
Err(e) => {
if e.is_unauthorized() {
error!(error = %e, "device token unauthorized; stopping remote access gateway");
self.cancellation_token.cancel();
return None;
}
warn!(error = %e, "failed to initialize device context; retrying");
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(MAX_BACKOFF);
}
}
};
Some(device_context)
}
async fn connect_watch(
&self,
device_context: &DeviceContext,
retry: &mut WatchRetryState,
) -> Option<Watch> {
loop {
let query = WatchQuery {
protocol_version: Some(REMOTE_ACCESS_PROTOCOL_VERSION.to_string()),
remote_access_session_id: self.remote_access_session_id().map(str::to_string),
previous_watch_lease_id: retry.previous_lease_id().map(str::to_string),
};
match Watch::connect(device_context.client.clone(), query).await {
Ok(watch) => {
on_connect_success(retry);
return Some(watch);
}
Err(e) => {
match on_connect_error(&e, retry) {
ConnectAction::StopUnauthorized => {
error!(error = %e, "device token unauthorized; stopping remote access gateway");
self.cancellation_token.cancel();
return None;
}
ConnectAction::RetryAfter(delay) => {
self.set_status(ConnectionStatus::Connecting);
tokio::time::sleep(delay).await;
}
}
}
}
}
}
async fn run_session(&self, wake_signal: WakeSignal) {
let result = tokio::select! {
biased;
result = self.connect_session(wake_signal) => result,
() = self.cancellation_token.cancelled() => return,
};
let (session, room_events) = match result {
Ok(pair) => pair,
Err(e) => {
error!(error = %e, "failed to join room after wake");
self.set_status(ConnectionStatus::Connecting);
return;
}
};
let remote_access_session_id = self.remote_access_session_id();
*self.session.lock() = Some(session.clone());
let Some(context) = self.context.upgrade() else {
info!(
remote_access_session_id,
"context has been dropped, stopping remote access connection"
);
*self.session.lock() = None;
if let Err(e) = session.room().close().await {
error!(remote_access_session_id, error = %e, "failed to close room: {e}");
}
return;
};
context.add_sink(session.clone());
let video_metadata_task = tokio::spawn(RemoteAccessSession::run_video_metadata_watcher(
session.clone(),
));
for (identity, participant) in session.room().remote_participants() {
let Some(version) = protocol_version::check_participant_protocol_version(
&identity,
&participant.attributes(),
remote_access_session_id,
) else {
session
.send_incompatible_version_error(&identity, &participant.attributes())
.await;
continue;
};
info!(
remote_access_session_id,
participant_identity = %identity,
version = %version,
"adding existing participant"
);
let sid = participant.sid();
let joined_at = participant.joined_at();
if let Err(e) = session
.add_participant(identity.clone(), sid, joined_at)
.await
{
error!(
remote_access_session_id,
error = %e,
"failed to add existing participant {identity}: {e}"
);
}
}
info!(remote_access_session_id, "running remote access server");
tokio::select! {
() = self.cancellation_token.cancelled() => (),
_ = session.handle_room_events(room_events) => {},
_ = session.log_periodic_stats() => {},
}
if self.cancellation_token.is_cancelled() {
self.set_status(ConnectionStatus::ShuttingDown);
}
*self.session.lock() = None;
{
let mut graph = self.connection_graph.lock();
let had_subscribers = graph.has_subscribers();
graph.clear_subscribers();
if had_subscribers {
if let Some(listener) = &self.listener {
listener.on_connection_graph_unsubscribe();
}
}
}
context.remove_sink(session.sink_id());
session.cancel();
if let Err(e) = video_metadata_task.await {
error!(
remote_access_session_id,
error = %e,
"video metadata watcher failed"
);
}
info!(remote_access_session_id, "disconnecting from room");
session.close().await;
}
fn create_server_info(&self, remote_access_session_id: &str) -> ServerInfo {
let mut metadata = self.server_info.clone().unwrap_or_default();
let supported_encodings = self.supported_encodings.clone();
metadata.insert("fg-library".into(), get_library_version());
let name = self.name.clone().unwrap_or_else(|| {
self.device_context
.get()
.map(|ctx| ctx.device.name.clone())
.unwrap_or_default()
});
let mut info = ServerInfo::new(name)
.with_session_id(remote_access_session_id)
.with_capabilities(
self.capabilities
.iter()
.flat_map(|c| c.as_protocol_capabilities())
.copied(),
)
.with_metadata(metadata);
if let Some(supported_encodings) = supported_encodings {
info = info.with_supported_encodings(supported_encodings);
}
info
}
fn has_capability(&self, cap: Capability) -> bool {
self.capabilities.contains(&cap)
}
pub(super) fn add_services(
&self,
new_services: Vec<Service>,
) -> std::result::Result<(), FoxgloveError> {
if !self.has_capability(Capability::Services) {
return Err(FoxgloveError::ServicesNotSupported);
}
if new_services.is_empty() {
return Ok(());
}
let has_supported_encodings = self
.supported_encodings
.as_ref()
.is_some_and(|e| !e.is_empty());
let mut new_names = HashMap::with_capacity(new_services.len());
for service in &new_services {
if new_names
.insert(service.name().to_string(), service.id())
.is_some()
{
return Err(FoxgloveError::DuplicateService(service.name().to_string()));
}
if service.request_encoding().is_none() && !has_supported_encodings {
return Err(FoxgloveError::MissingRequestEncoding(
service.name().to_string(),
));
}
}
let new_service_ids: Vec<ServiceId> = {
let mut services = self.services.write();
for service in &new_services {
if services.contains_name(service.name()) || services.contains_id(service.id()) {
return Err(FoxgloveError::DuplicateService(service.name().to_string()));
}
}
let ids = new_services.iter().map(|s| s.id()).collect();
for service in new_services {
services.insert(service);
}
ids
};
if let Some(session) = self.session.lock().as_ref() {
session.advertise_new_services(&new_service_ids);
}
Ok(())
}
pub(super) fn remove_services(&self, names: impl IntoIterator<Item = impl AsRef<str>>) {
let removed_ids: Vec<ServiceId> = {
let mut services = self.services.write();
names
.into_iter()
.filter_map(|name| services.remove_by_name(name).map(|s| s.id()))
.collect()
};
if removed_ids.is_empty() {
return;
}
if let Some(session) = self.session.lock().as_ref() {
session.unadvertise_services(&removed_ids);
}
}
pub(super) fn shutdown(&self) {
self.cancellation_token.cancel();
}
}