rill_server/actors/provider_session/
link.rs

1use super::ProviderSession;
2use anyhow::Error;
3use derive_more::From;
4use meio::{Action, Address, Interaction};
5use meio_connect::client::WsSender;
6use rill_protocol::io::client::{ClientProtocol, ClientReqId, ClientResponse};
7use rill_protocol::io::provider::{Path, ProviderReqId};
8use rill_protocol::io::transport::WideEnvelope;
9use std::collections::hash_map::{Entry, HashMap};
10use thiserror::Error;
11
12pub(super) type ClientSender = WsSender<WideEnvelope<ClientProtocol, ClientResponse>>;
13
14#[derive(Debug, From)]
15pub struct ProviderLink {
16    address: Address<ProviderSession>,
17}
18
19impl ProviderLink {
20    pub fn bind(&self, sender: ClientSender) -> BindedProviderLink {
21        BindedProviderLink {
22            sender,
23            address: self.address.clone(),
24            subscriptions: HashMap::new(),
25        }
26    }
27}
28
29#[derive(Debug, From)]
30pub struct BindedProviderLink {
31    sender: ClientSender,
32    address: Address<ProviderSession>,
33    subscriptions: HashMap<(ClientReqId, Path), ProviderReqId>,
34}
35
36pub(super) struct SubscribeToPath {
37    pub path: Path,
38    pub direct_id: ClientReqId,
39    pub sender: ClientSender,
40}
41
42impl Interaction for SubscribeToPath {
43    type Output = ProviderReqId;
44}
45
46impl BindedProviderLink {
47    pub async fn subscribe(&mut self, path: Path, direct_id: ClientReqId) -> Result<(), Error> {
48        let key = (direct_id, path.clone());
49        match self.subscriptions.entry(key) {
50            Entry::Vacant(entry) => {
51                let sender = self.sender.clone();
52                let msg = SubscribeToPath {
53                    path,
54                    direct_id,
55                    sender,
56                };
57                let direct_id = self.address.interact(msg).recv().await?;
58                entry.insert(direct_id);
59                Ok(())
60            }
61            Entry::Occupied(_entry) => Err(Reason::AlreadySubscribed(path).into()),
62        }
63    }
64}
65
66pub(super) struct UnsubscribeFromPath {
67    pub path: Path,
68    pub direct_id: ProviderReqId,
69}
70
71impl Action for UnsubscribeFromPath {}
72
73impl BindedProviderLink {
74    pub async fn unsubscribe(&mut self, path: Path, direct_id: ClientReqId) -> Result<(), Error> {
75        let key = (direct_id, path);
76        if let Some(req_id) = self.subscriptions.remove(&key) {
77            let msg = UnsubscribeFromPath {
78                path: key.1,
79                direct_id: req_id,
80            };
81            self.address.act(msg).await?;
82        }
83        Ok(())
84    }
85
86    pub async fn unsubscribe_all(&mut self) {
87        let paths: Vec<_> = self.subscriptions.keys().cloned().collect();
88        for (direct_id, path) in paths {
89            if let Err(err) = self.unsubscribe(path, direct_id).await {
90                log::error!("Unsubscribing all partially failed: {}", err);
91            }
92        }
93    }
94}
95
96#[derive(Debug, Error)]
97enum Reason {
98    #[error("Already subscribed {0}")]
99    AlreadySubscribed(Path),
100    /*
101    #[error("Never subscribed {0}")]
102    NeverSubscribed(Path),
103    */
104}
105
106/*
107/// It's not cloneable, because it tracks subscriptions.
108#[derive(Debug)]
109pub struct ProviderSessionLink {
110    address: Address<ProviderSession>,
111    subscriptions: HashMap<Path, ProviderReqId>,
112}
113
114impl From<Address<ProviderSession>> for ProviderSessionLink {
115    fn from(address: Address<ProviderSession>) -> Self {
116        Self {
117            address,
118            subscriptions: HashMap::new(),
119        }
120    }
121}
122
123pub(super) struct NewRequest {
124    pub request: ServerToProvider,
125}
126
127impl Interaction for NewRequest {
128    type Output = ProviderReqId;
129}
130
131impl ProviderSessionLink {
132    pub async fn subscribe(&mut self, path: Path) -> Result<(), Error> {
133        match self.subscriptions.entry(path.clone()) {
134            Entry::Vacant(entry) => {
135                let request = ServerToProvider::ControlStream { active: true, path };
136                let msg = NewRequest { request };
137                let direct_id = self.address.interact_and_wait(msg).await?;
138                entry.insert(direct_id);
139                Ok(())
140            }
141            Entry::Occupied(_entry) => Err(Reason::AlreadySubscribed(path).into()),
142        }
143    }
144}
145
146pub(super) struct SubRequest {
147    pub direct_id: ProviderReqId,
148    pub request: ServerToProvider,
149}
150
151impl Action for SubRequest {}
152
153impl ProviderSessionLink {
154    // TODO: Move to the separate link
155    // TODO: Add id of the stream (returned before by subscribe call)
156    pub async fn unsubscribe(&mut self, path: Path) -> Result<(), Error> {
157        if let Some(direct_id) = self.subscriptions.remove(&path) {
158            let request = ServerToProvider::ControlStream {
159                active: false,
160                path,
161            };
162            let msg = SubRequest { direct_id, request };
163            self.address.act(msg).await
164        } else {
165            Err(Reason::NeverSubscribed(path).into())
166        }
167    }
168
169    pub async fn unsubscribe_all(&mut self) -> Result<(), Error> {
170        let paths: Vec<_> = self.subscriptions.keys().cloned().collect();
171        for path in paths {
172            self.unsubscribe(path).await?;
173        }
174        Ok(())
175    }
176}
177*/