mod client;
mod server;
#[cfg(test)]
mod tests;
pub use client::RpcClientManager;
pub use server::{HandleRequestOptions, RpcServerManager};
use crate::data_stream::{StreamResult, StreamTextOptions, TextStreamInfo};
use crate::room::id::ParticipantIdentity;
use livekit_protocol::RpcError as RpcError_Proto;
use std::{error::Error, fmt::Display, future::Future, time::Duration};
pub(crate) const RPC_VERSION_V1: u32 = 1;
pub(crate) const RPC_VERSION_V2: u32 = 2;
pub(crate) const RPC_REQUEST_TOPIC: &str = "lk.rpc_request";
pub(crate) const RPC_RESPONSE_TOPIC: &str = "lk.rpc_response";
pub(crate) const ATTR_REQUEST_ID: &str = "lk.rpc_request_id";
pub(crate) const ATTR_METHOD: &str = "lk.rpc_request_method";
pub(crate) const ATTR_RESPONSE_TIMEOUT_MS: &str = "lk.rpc_request_response_timeout_ms";
pub(crate) const ATTR_VERSION: &str = "lk.rpc_request_version";
pub(crate) trait RpcTransport: Send + Sync {
fn publish_data(
&self,
data: livekit_protocol::DataPacket,
) -> impl Future<Output = Result<(), crate::room::RoomError>> + Send;
fn send_text(
&self,
text: &str,
options: StreamTextOptions,
) -> impl Future<Output = StreamResult<TextStreamInfo>> + Send;
fn remote_client_protocol(&self, identity: &ParticipantIdentity) -> i32;
fn server_version(&self) -> Option<String>;
}
pub(crate) struct SessionTransport(pub(crate) std::sync::Arc<super::RoomSession>);
impl RpcTransport for SessionTransport {
async fn publish_data(
&self,
data: livekit_protocol::DataPacket,
) -> Result<(), crate::room::RoomError> {
self.0
.rtc_engine
.publish_data(data, crate::DataPacketKind::Reliable, false)
.await
.map_err(Into::into)
}
async fn send_text(
&self,
text: &str,
options: StreamTextOptions,
) -> StreamResult<TextStreamInfo> {
self.0.outgoing_stream_manager.send_text(text, options).await
}
fn remote_client_protocol(&self, identity: &ParticipantIdentity) -> i32 {
self.0.get_remote_client_protocol(identity)
}
fn server_version(&self) -> Option<String> {
self.0
.rtc_engine
.session()
.signal_client()
.join_response()
.server_info
.and_then(|info| info.version.is_empty().then(|| info.version))
}
}
#[derive(Debug, Clone)]
pub struct PerformRpcData {
pub destination_identity: String,
pub method: String,
pub payload: String,
pub response_timeout: Duration,
}
impl Default for PerformRpcData {
fn default() -> Self {
Self {
destination_identity: Default::default(),
method: Default::default(),
payload: Default::default(),
response_timeout: Duration::from_secs(15),
}
}
}
#[derive(Debug, Clone)]
pub struct RpcInvocationData {
pub request_id: String,
pub caller_identity: ParticipantIdentity,
pub payload: String,
pub response_timeout: Duration,
}
#[derive(Debug, Clone)]
pub struct RpcError {
pub code: u32,
pub message: String,
pub data: Option<String>,
}
impl RpcError {
pub const MAX_MESSAGE_BYTES: usize = 256;
pub const MAX_DATA_BYTES: usize = MAX_V1_PAYLOAD_BYTES;
pub fn new(code: u32, message: String, data: Option<String>) -> Self {
Self {
code,
message: truncate_bytes(&message, Self::MAX_MESSAGE_BYTES),
data: data.map(|d| truncate_bytes(&d, Self::MAX_DATA_BYTES)),
}
}
pub fn from_proto(proto: RpcError_Proto) -> Self {
Self::new(proto.code, proto.message, Some(proto.data))
}
pub fn to_proto(&self) -> RpcError_Proto {
RpcError_Proto {
code: self.code,
message: self.message.clone(),
data: self.data.clone().unwrap_or_default(),
}
}
}
impl Display for RpcError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RPC Error: {} ({})", self.message, self.code)
}
}
impl Error for RpcError {}
#[derive(Debug, Clone, Copy)]
pub enum RpcErrorCode {
ApplicationError = 1500,
ConnectionTimeout = 1501,
ResponseTimeout = 1502,
RecipientDisconnected = 1503,
ResponsePayloadTooLarge = 1504,
SendFailed = 1505,
UnsupportedMethod = 1400,
RecipientNotFound = 1401,
RequestPayloadTooLarge = 1402,
UnsupportedServer = 1403,
UnsupportedVersion = 1404,
}
impl RpcErrorCode {
pub(crate) fn message(&self) -> &'static str {
match self {
Self::ApplicationError => "Application error in method handler",
Self::ConnectionTimeout => "Connection timeout",
Self::ResponseTimeout => "Response timeout",
Self::RecipientDisconnected => "Recipient disconnected",
Self::ResponsePayloadTooLarge => "Response payload too large",
Self::SendFailed => "Failed to send",
Self::UnsupportedMethod => "Method not supported at destination",
Self::RecipientNotFound => "Recipient not found",
Self::RequestPayloadTooLarge => "Request payload too large",
Self::UnsupportedServer => "RPC not supported by server",
Self::UnsupportedVersion => "Unsupported RPC version",
}
}
}
impl RpcError {
pub(crate) fn built_in(code: RpcErrorCode, data: Option<String>) -> Self {
Self::new(code as u32, code.message().to_string(), data)
}
}
pub const MAX_V1_PAYLOAD_BYTES: usize = 15360;
pub(crate) fn byte_length(s: &str) -> usize {
s.as_bytes().len()
}
pub(crate) fn truncate_bytes(s: &str, max_bytes: usize) -> String {
if byte_length(s) <= max_bytes {
return s.to_string();
}
let mut result = String::new();
for c in s.chars() {
if byte_length(&(result.clone() + &c.to_string())) > max_bytes {
break;
}
result.push(c);
}
result
}