1#![forbid(unsafe_code)]
2#![warn(missing_docs)]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4#![doc(
5 html_logo_url = "https://raw.githubusercontent.com/ENQT-GmbH/remoc/master/.misc/Remoc.png",
6 html_favicon_url = "https://raw.githubusercontent.com/ENQT-GmbH/remoc/master/.misc/Remoc.png"
7)]
8#![deprecated = "remoc-obs has been integrated into remoc as module remoc::robs. Please update your references."]
9
10pub mod hash_map;
27pub mod hash_set;
28pub mod list;
29pub mod vec;
30
31use remoc::prelude::*;
32use serde::{Deserialize, Serialize};
33use std::{error::Error, fmt};
34use tokio::sync::watch;
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
38pub enum SendError {
39 RemoteSend(rch::base::SendErrorKind),
41 RemoteConnect(chmux::ConnectError),
43 RemoteListen(chmux::ListenerError),
45 RemoteForward,
47}
48
49impl fmt::Display for SendError {
50 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51 match self {
52 Self::RemoteSend(err) => write!(f, "send error: {}", err),
53 Self::RemoteConnect(err) => write!(f, "connect error: {}", err),
54 Self::RemoteListen(err) => write!(f, "listen error: {}", err),
55 Self::RemoteForward => write!(f, "forwarding error"),
56 }
57 }
58}
59
60impl Error for SendError {}
61
62impl<T> TryFrom<rch::broadcast::SendError<T>> for SendError {
63 type Error = rch::broadcast::SendError<T>;
64
65 fn try_from(err: rch::broadcast::SendError<T>) -> Result<Self, Self::Error> {
66 match err {
67 rch::broadcast::SendError::RemoteSend(err) => Ok(Self::RemoteSend(err)),
68 rch::broadcast::SendError::RemoteConnect(err) => Ok(Self::RemoteConnect(err)),
69 rch::broadcast::SendError::RemoteListen(err) => Ok(Self::RemoteListen(err)),
70 rch::broadcast::SendError::RemoteForward => Ok(Self::RemoteForward),
71 other @ rch::broadcast::SendError::Closed(_) => Err(other),
72 }
73 }
74}
75
76impl<T> TryFrom<rch::mpsc::SendError<T>> for SendError {
77 type Error = rch::mpsc::SendError<T>;
78
79 fn try_from(err: rch::mpsc::SendError<T>) -> Result<Self, Self::Error> {
80 match err {
81 rch::mpsc::SendError::RemoteSend(err) => Ok(Self::RemoteSend(err)),
82 rch::mpsc::SendError::RemoteConnect(err) => Ok(Self::RemoteConnect(err)),
83 rch::mpsc::SendError::RemoteListen(err) => Ok(Self::RemoteListen(err)),
84 rch::mpsc::SendError::RemoteForward => Ok(Self::RemoteForward),
85 other @ rch::mpsc::SendError::Closed(_) => Err(other),
86 }
87 }
88}
89
90#[derive(Clone, Debug, Serialize, Deserialize)]
92pub enum RecvError {
93 Closed,
95 Lagged,
100 MaxSizeExceeded(usize),
102 RemoteReceive(rch::base::RecvError),
104 RemoteConnect(chmux::ConnectError),
106 RemoteListen(chmux::ListenerError),
108 InvalidIndex(usize),
110}
111
112impl fmt::Display for RecvError {
113 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
114 match self {
115 Self::Closed => write!(f, "observed collection was dropped"),
116 Self::Lagged => write!(f, "observation lagged behind"),
117 Self::MaxSizeExceeded(size) => write!(f, "mirrored collection reached it maximum size of {}", size),
118 Self::RemoteReceive(err) => write!(f, "receive error: {}", err),
119 Self::RemoteConnect(err) => write!(f, "connect error: {}", err),
120 Self::RemoteListen(err) => write!(f, "listen error: {}", err),
121 Self::InvalidIndex(idx) => write!(f, "index {idx} is invalid"),
122 }
123 }
124}
125
126impl Error for RecvError {}
127
128impl From<rch::broadcast::RecvError> for RecvError {
129 fn from(err: rch::broadcast::RecvError) -> Self {
130 match err {
131 rch::broadcast::RecvError::Closed => Self::Closed,
132 rch::broadcast::RecvError::Lagged => Self::Lagged,
133 rch::broadcast::RecvError::RemoteReceive(err) => Self::RemoteReceive(err),
134 rch::broadcast::RecvError::RemoteConnect(err) => Self::RemoteConnect(err),
135 rch::broadcast::RecvError::RemoteListen(err) => Self::RemoteListen(err),
136 }
137 }
138}
139
140impl From<rch::mpsc::RecvError> for RecvError {
141 fn from(err: rch::mpsc::RecvError) -> Self {
142 match err {
143 rch::mpsc::RecvError::RemoteReceive(err) => Self::RemoteReceive(err),
144 rch::mpsc::RecvError::RemoteConnect(err) => Self::RemoteConnect(err),
145 rch::mpsc::RecvError::RemoteListen(err) => Self::RemoteListen(err),
146 }
147 }
148}
149
150pub(crate) fn send_event<E, Codec>(tx: &rch::broadcast::Sender<E, Codec>, on_err: &dyn Fn(SendError), event: E)
152where
153 Codec: remoc::codec::Codec,
154 E: RemoteSend + Clone,
155{
156 match tx.send(event) {
157 Ok(()) => (),
158 Err(err) if err.is_disconnected() => (),
159 Err(err) => match err.try_into() {
160 Ok(err) => (on_err)(err),
161 Err(_) => unreachable!(),
162 },
163 }
164}
165
166pub(crate) fn default_on_err(err: SendError) {
168 tracing::warn!("sending failed: {}", err);
169}
170
171#[derive(Clone, Debug, Serialize, Deserialize)]
173pub struct DroppedError;
174
175impl fmt::Display for DroppedError {
176 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
177 write!(f, "dropped")
178 }
179}
180
181impl Error for DroppedError {}
182
183pub(crate) struct ChangeSender {
185 tx: watch::Sender<()>,
186 rx: watch::Receiver<()>,
187}
188
189impl ChangeSender {
190 pub fn new() -> Self {
192 let (tx, rx) = watch::channel(());
193 Self { tx, rx }
194 }
195
196 pub fn subscribe(&self) -> ChangeNotifier {
198 ChangeNotifier(self.rx.clone())
199 }
200
201 pub fn notify(&self) {
203 self.tx.send_replace(());
204 }
205}
206
207#[derive(Clone)]
211pub struct ChangeNotifier(watch::Receiver<()>);
212
213impl fmt::Debug for ChangeNotifier {
214 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
215 f.debug_tuple("ChangeNotifier").finish()
216 }
217}
218
219impl ChangeNotifier {
220 pub async fn changed(&mut self) -> Result<(), DroppedError> {
223 self.0.changed().await.map_err(|_| DroppedError)
224 }
225
226 pub fn update(&mut self) {
229 self.0.borrow_and_update();
230 }
231}