use std::sync::Arc;
use tokio::sync::broadcast;
use crate::auth::Auth;
use crate::connection::{Connection, ConnectionManager};
use crate::options::ClientOptions;
use crate::realtime_channel::Channels;
use crate::rest::Rest;
#[allow(unused_imports)]
use crate::Result;
pub struct Realtime {
pub connection: Connection,
pub channels: Channels,
rest: Rest,
_manager_handle: tokio::task::JoinHandle<()>,
_router_handle: tokio::task::JoinHandle<()>,
}
impl Realtime {
pub fn new(key: &str) -> Result<Self> {
let opts = ClientOptions::new(key);
Self::from_options(opts)
}
pub fn from_options(opts: ClientOptions) -> Result<Self> {
let rest = opts.rest()?;
let rt_opts = Arc::new(ClientOptions::from_rest_opts(rest.options()));
let channel_retry_timeout = rt_opts.channel_retry_timeout;
let (connection, channel_msg_rx, manager) = ConnectionManager::new(rt_opts);
let channels = Channels::new(connection.clone(), channel_retry_timeout);
let manager_handle = tokio::spawn(manager.run());
let router_handle = {
let channels_ref = channels.clone_inner();
let mut channel_msg_rx = channel_msg_rx;
let mut state_rx = connection.on_state_change();
tokio::spawn(async move {
loop {
tokio::select! {
msg = channel_msg_rx.recv() => {
match msg {
Some(pm) => {
channels_ref.process_channel_message(pm).await;
}
None => break,
}
}
change = state_rx.recv() => {
match change {
Ok(change) => {
channels_ref.propagate_connection_state(&change).await;
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
}
})
};
Ok(Self {
connection,
channels,
rest,
_manager_handle: manager_handle,
_router_handle: router_handle,
})
}
pub fn auth(&self) -> Auth<'_> {
self.rest.auth()
}
pub async fn close(&self) {
self.connection.close();
}
}