#![warn(rust_2018_idioms)]
#![allow(missing_docs)]
use rvoip_core_traits::ids::{ConnectionId, ConversationId, MessageId, SessionId};
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{mpsc, RwLock};
#[cfg(feature = "sip")]
pub mod sip {
pub use rvoip_sip::api;
}
#[cfg(feature = "webrtc")]
pub mod webrtc {
pub use rvoip_webrtc::*;
}
#[cfg(feature = "uctp")]
pub mod uctp {
pub use rvoip_uctp::*;
}
#[derive(Clone, Debug)]
pub enum Credential {
Bearer(String),
OAuth2Dpop { access_token: String, dpop_proof: String },
}
#[derive(Clone, Debug)]
pub enum CallTarget {
Identity(String),
Participant(String),
Uri(String),
}
#[derive(Clone, Debug)]
pub enum SessionMedium {
Voice,
Video,
VoiceVideo,
TextChat,
ScreenShare,
}
#[derive(Debug, Error)]
pub enum ClientError {
#[error("transport not enabled for scheme: {0}")]
UnsupportedScheme(String),
#[error("connection failed: {0}")]
ConnectFailed(String),
#[error("session not found")]
SessionNotFound,
#[error("not implemented: {0}")]
NotImplemented(&'static str),
}
pub type Result<T> = std::result::Result<T, ClientError>;
#[derive(Debug)]
pub enum InboundEvent {
IncomingSession(SessionHandle),
Message {
conversation_id: ConversationId,
message_id: MessageId,
from: String,
body: String,
},
AssuranceChanged {
connection_id: ConnectionId,
new_assurance: String,
},
Disconnected { reason: String },
}
#[derive(Debug)]
pub struct SessionHandle {
session_id: SessionId,
conversation_id: ConversationId,
_inner: Arc<RwLock<SessionInner>>,
}
#[derive(Debug, Default)]
struct SessionInner {
#[allow(dead_code)] held: bool,
}
impl SessionHandle {
pub fn session_id(&self) -> &SessionId {
&self.session_id
}
pub fn conversation_id(&self) -> &ConversationId {
&self.conversation_id
}
pub async fn accept(self) -> Result<Self> {
Ok(self)
}
pub async fn reject(self, _reason: &str) -> Result<()> {
Err(ClientError::NotImplemented("SessionHandle::reject"))
}
pub async fn end(&self) -> Result<()> {
Err(ClientError::NotImplemented("SessionHandle::end"))
}
pub async fn hold(&self) -> Result<()> {
Err(ClientError::NotImplemented("SessionHandle::hold"))
}
pub async fn resume(&self) -> Result<()> {
Err(ClientError::NotImplemented("SessionHandle::resume"))
}
pub async fn mute(&self) -> Result<()> {
Err(ClientError::NotImplemented("SessionHandle::mute"))
}
pub async fn send_dtmf(&self, _digits: &str) -> Result<()> {
Err(ClientError::NotImplemented("SessionHandle::send_dtmf"))
}
}
pub struct Client {
server_uri: String,
inbound_tx: mpsc::Sender<InboundEvent>,
inbound_rx: tokio::sync::Mutex<Option<mpsc::Receiver<InboundEvent>>>,
}
impl Client {
pub async fn connect(server_uri: &str, _credential: Credential) -> Result<Self> {
let scheme = server_uri.split("://").next().unwrap_or("");
match scheme {
"uctp+quic" => {
#[cfg(not(feature = "uctp"))]
{
return Err(ClientError::UnsupportedScheme(scheme.into()));
}
#[cfg(feature = "uctp")]
{
let (tx, rx) = mpsc::channel(64);
Ok(Self {
server_uri: server_uri.into(),
inbound_tx: tx,
inbound_rx: tokio::sync::Mutex::new(Some(rx)),
})
}
}
"sip" => {
#[cfg(not(feature = "sip"))]
{
Err(ClientError::UnsupportedScheme(scheme.into()))
}
#[cfg(feature = "sip")]
{
let (tx, rx) = mpsc::channel(64);
Ok(Self {
server_uri: server_uri.into(),
inbound_tx: tx,
inbound_rx: tokio::sync::Mutex::new(Some(rx)),
})
}
}
"wss" | "https" => {
#[cfg(not(feature = "webrtc"))]
{
Err(ClientError::UnsupportedScheme(scheme.into()))
}
#[cfg(feature = "webrtc")]
{
let (tx, rx) = mpsc::channel(64);
Ok(Self {
server_uri: server_uri.into(),
inbound_tx: tx,
inbound_rx: tokio::sync::Mutex::new(Some(rx)),
})
}
}
other => Err(ClientError::UnsupportedScheme(other.into())),
}
}
pub fn server_uri(&self) -> &str {
&self.server_uri
}
pub async fn call(
&self,
_target: CallTarget,
_medium: SessionMedium,
) -> Result<SessionHandle> {
Ok(SessionHandle {
session_id: SessionId::new(),
conversation_id: ConversationId::new(),
_inner: Arc::new(RwLock::new(SessionInner::default())),
})
}
pub async fn send_message(
&self,
_cid: ConversationId,
_body: &str,
) -> Result<MessageId> {
Err(ClientError::NotImplemented("Client::send_message"))
}
pub fn incoming(&self) -> Option<mpsc::Receiver<InboundEvent>> {
self.inbound_rx.try_lock().ok().and_then(|mut g| g.take())
}
#[doc(hidden)]
pub async fn deliver(&self, event: InboundEvent) {
let _ = self.inbound_tx.send(event).await;
}
pub async fn close(self) -> Result<()> {
drop(self.inbound_tx);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn connect_uctp_quic_returns_client() {
let client = Client::connect(
"uctp+quic://example.com:4433",
Credential::Bearer("test".into()),
)
.await
.expect("connect");
assert_eq!(client.server_uri(), "uctp+quic://example.com:4433");
}
#[tokio::test]
async fn connect_unknown_scheme_errors() {
let result = Client::connect(
"ftp://example.com",
Credential::Bearer("test".into()),
)
.await;
assert!(matches!(result, Err(ClientError::UnsupportedScheme(_))));
}
#[tokio::test]
async fn incoming_can_be_taken_once() {
let client = Client::connect(
"uctp+quic://example.com",
Credential::Bearer("t".into()),
)
.await
.unwrap();
assert!(client.incoming().is_some());
assert!(client.incoming().is_none(), "second take should be None");
}
#[tokio::test]
async fn deliver_routes_to_incoming() {
let client = Client::connect(
"uctp+quic://example.com",
Credential::Bearer("t".into()),
)
.await
.unwrap();
let mut rx = client.incoming().unwrap();
client
.deliver(InboundEvent::Disconnected {
reason: "test".into(),
})
.await;
let event = rx.recv().await.expect("event");
assert!(matches!(event, InboundEvent::Disconnected { .. }));
}
}