use super::ProviderSession;
use anyhow::Error;
use derive_more::From;
use meio::{Action, Address, Interaction};
use meio_connect::client::WsSender;
use rill_protocol::io::client::{ClientProtocol, ClientReqId, ClientResponse};
use rill_protocol::io::provider::{Path, ProviderReqId};
use rill_protocol::io::transport::WideEnvelope;
use std::collections::hash_map::{Entry, HashMap};
use thiserror::Error;
pub(super) type ClientSender = WsSender<WideEnvelope<ClientProtocol, ClientResponse>>;
#[derive(Debug, From)]
pub struct ProviderLink {
address: Address<ProviderSession>,
}
impl ProviderLink {
pub fn bind(&self, sender: ClientSender) -> BindedProviderLink {
BindedProviderLink {
sender,
address: self.address.clone(),
subscriptions: HashMap::new(),
}
}
}
#[derive(Debug, From)]
pub struct BindedProviderLink {
sender: ClientSender,
address: Address<ProviderSession>,
subscriptions: HashMap<(ClientReqId, Path), ProviderReqId>,
}
pub(super) struct SubscribeToPath {
pub path: Path,
pub direct_id: ClientReqId,
pub sender: ClientSender,
}
impl Interaction for SubscribeToPath {
type Output = ProviderReqId;
}
impl BindedProviderLink {
pub async fn subscribe(&mut self, path: Path, direct_id: ClientReqId) -> Result<(), Error> {
let key = (direct_id, path.clone());
match self.subscriptions.entry(key) {
Entry::Vacant(entry) => {
let sender = self.sender.clone();
let msg = SubscribeToPath {
path,
direct_id,
sender,
};
let direct_id = self.address.interact(msg).recv().await?;
entry.insert(direct_id);
Ok(())
}
Entry::Occupied(_entry) => Err(Reason::AlreadySubscribed(path).into()),
}
}
}
pub(super) struct UnsubscribeFromPath {
pub path: Path,
pub direct_id: ProviderReqId,
}
impl Action for UnsubscribeFromPath {}
impl BindedProviderLink {
pub async fn unsubscribe(&mut self, path: Path, direct_id: ClientReqId) -> Result<(), Error> {
let key = (direct_id, path);
if let Some(req_id) = self.subscriptions.remove(&key) {
let msg = UnsubscribeFromPath {
path: key.1,
direct_id: req_id,
};
self.address.act(msg).await?;
}
Ok(())
}
pub async fn unsubscribe_all(&mut self) {
let paths: Vec<_> = self.subscriptions.keys().cloned().collect();
for (direct_id, path) in paths {
if let Err(err) = self.unsubscribe(path, direct_id).await {
log::error!("Unsubscribing all partially failed: {}", err);
}
}
}
}
#[derive(Debug, Error)]
enum Reason {
#[error("Already subscribed {0}")]
AlreadySubscribed(Path),
}