rill_server/actors/provider_session/link.rs
1use super::ProviderSession;
2use anyhow::Error;
3use derive_more::From;
4use meio::{Action, Address, Interaction};
5use meio_connect::client::WsSender;
6use rill_protocol::io::client::{ClientProtocol, ClientReqId, ClientResponse};
7use rill_protocol::io::provider::{Path, ProviderReqId};
8use rill_protocol::io::transport::WideEnvelope;
9use std::collections::hash_map::{Entry, HashMap};
10use thiserror::Error;
11
12pub(super) type ClientSender = WsSender<WideEnvelope<ClientProtocol, ClientResponse>>;
13
14#[derive(Debug, From)]
15pub struct ProviderLink {
16 address: Address<ProviderSession>,
17}
18
19impl ProviderLink {
20 pub fn bind(&self, sender: ClientSender) -> BindedProviderLink {
21 BindedProviderLink {
22 sender,
23 address: self.address.clone(),
24 subscriptions: HashMap::new(),
25 }
26 }
27}
28
29#[derive(Debug, From)]
30pub struct BindedProviderLink {
31 sender: ClientSender,
32 address: Address<ProviderSession>,
33 subscriptions: HashMap<(ClientReqId, Path), ProviderReqId>,
34}
35
36pub(super) struct SubscribeToPath {
37 pub path: Path,
38 pub direct_id: ClientReqId,
39 pub sender: ClientSender,
40}
41
42impl Interaction for SubscribeToPath {
43 type Output = ProviderReqId;
44}
45
46impl BindedProviderLink {
47 pub async fn subscribe(&mut self, path: Path, direct_id: ClientReqId) -> Result<(), Error> {
48 let key = (direct_id, path.clone());
49 match self.subscriptions.entry(key) {
50 Entry::Vacant(entry) => {
51 let sender = self.sender.clone();
52 let msg = SubscribeToPath {
53 path,
54 direct_id,
55 sender,
56 };
57 let direct_id = self.address.interact(msg).recv().await?;
58 entry.insert(direct_id);
59 Ok(())
60 }
61 Entry::Occupied(_entry) => Err(Reason::AlreadySubscribed(path).into()),
62 }
63 }
64}
65
66pub(super) struct UnsubscribeFromPath {
67 pub path: Path,
68 pub direct_id: ProviderReqId,
69}
70
71impl Action for UnsubscribeFromPath {}
72
73impl BindedProviderLink {
74 pub async fn unsubscribe(&mut self, path: Path, direct_id: ClientReqId) -> Result<(), Error> {
75 let key = (direct_id, path);
76 if let Some(req_id) = self.subscriptions.remove(&key) {
77 let msg = UnsubscribeFromPath {
78 path: key.1,
79 direct_id: req_id,
80 };
81 self.address.act(msg).await?;
82 }
83 Ok(())
84 }
85
86 pub async fn unsubscribe_all(&mut self) {
87 let paths: Vec<_> = self.subscriptions.keys().cloned().collect();
88 for (direct_id, path) in paths {
89 if let Err(err) = self.unsubscribe(path, direct_id).await {
90 log::error!("Unsubscribing all partially failed: {}", err);
91 }
92 }
93 }
94}
95
96#[derive(Debug, Error)]
97enum Reason {
98 #[error("Already subscribed {0}")]
99 AlreadySubscribed(Path),
100 /*
101 #[error("Never subscribed {0}")]
102 NeverSubscribed(Path),
103 */
104}
105
106/*
107/// It's not cloneable, because it tracks subscriptions.
108#[derive(Debug)]
109pub struct ProviderSessionLink {
110 address: Address<ProviderSession>,
111 subscriptions: HashMap<Path, ProviderReqId>,
112}
113
114impl From<Address<ProviderSession>> for ProviderSessionLink {
115 fn from(address: Address<ProviderSession>) -> Self {
116 Self {
117 address,
118 subscriptions: HashMap::new(),
119 }
120 }
121}
122
123pub(super) struct NewRequest {
124 pub request: ServerToProvider,
125}
126
127impl Interaction for NewRequest {
128 type Output = ProviderReqId;
129}
130
131impl ProviderSessionLink {
132 pub async fn subscribe(&mut self, path: Path) -> Result<(), Error> {
133 match self.subscriptions.entry(path.clone()) {
134 Entry::Vacant(entry) => {
135 let request = ServerToProvider::ControlStream { active: true, path };
136 let msg = NewRequest { request };
137 let direct_id = self.address.interact_and_wait(msg).await?;
138 entry.insert(direct_id);
139 Ok(())
140 }
141 Entry::Occupied(_entry) => Err(Reason::AlreadySubscribed(path).into()),
142 }
143 }
144}
145
146pub(super) struct SubRequest {
147 pub direct_id: ProviderReqId,
148 pub request: ServerToProvider,
149}
150
151impl Action for SubRequest {}
152
153impl ProviderSessionLink {
154 // TODO: Move to the separate link
155 // TODO: Add id of the stream (returned before by subscribe call)
156 pub async fn unsubscribe(&mut self, path: Path) -> Result<(), Error> {
157 if let Some(direct_id) = self.subscriptions.remove(&path) {
158 let request = ServerToProvider::ControlStream {
159 active: false,
160 path,
161 };
162 let msg = SubRequest { direct_id, request };
163 self.address.act(msg).await
164 } else {
165 Err(Reason::NeverSubscribed(path).into())
166 }
167 }
168
169 pub async fn unsubscribe_all(&mut self) -> Result<(), Error> {
170 let paths: Vec<_> = self.subscriptions.keys().cloned().collect();
171 for path in paths {
172 self.unsubscribe(path).await?;
173 }
174 Ok(())
175 }
176}
177*/