remoc_obs/
lib.rs

1#![forbid(unsafe_code)]
2#![warn(missing_docs)]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4#![doc(
5    html_logo_url = "https://raw.githubusercontent.com/ENQT-GmbH/remoc/master/.misc/Remoc.png",
6    html_favicon_url = "https://raw.githubusercontent.com/ENQT-GmbH/remoc/master/.misc/Remoc.png"
7)]
8#![deprecated = "remoc-obs has been integrated into remoc as module remoc::robs. Please update your references."]
9
10//! Remotely observable collections.
11//!
12//! **This crate is deprecated:** Remoc-obs has been integrated into [Remoc] as of version 0.10.
13//! Please update your references to use the [`remoc::robs`] module.
14//! Development continues as part of the [Remoc repository].
15//!
16//! [Remoc]: https://crates.io/crates/remoc
17//! [Remoc repository]: https://github.com/ENQT-GmbH/remoc
18//! [`remoc::robs`]: https://docs.rs/remoc/latest/remoc/robs/index.html
19//!
20//! This crate provides collections that emit an event for each change.
21//! This event stream can be sent to a local or remote endpoint (using [remoc]),
22//! where it can be either processed event-wise or a mirrored collection can
23//! be built from it.
24//!
25
26pub mod hash_map;
27pub mod hash_set;
28pub mod list;
29pub mod vec;
30
31use remoc::prelude::*;
32use serde::{Deserialize, Serialize};
33use std::{error::Error, fmt};
34use tokio::sync::watch;
35
36/// An error occurred during sending an event for an observable collection.
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub enum SendError {
39    /// Sending to a remote endpoint failed.
40    RemoteSend(rch::base::SendErrorKind),
41    /// Connecting a sent channel failed.
42    RemoteConnect(chmux::ConnectError),
43    /// Listening to a received channel failed.
44    RemoteListen(chmux::ListenerError),
45    /// Forwarding at a remote endpoint to another remote endpoint failed.
46    RemoteForward,
47}
48
49impl fmt::Display for SendError {
50    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51        match self {
52            Self::RemoteSend(err) => write!(f, "send error: {}", err),
53            Self::RemoteConnect(err) => write!(f, "connect error: {}", err),
54            Self::RemoteListen(err) => write!(f, "listen error: {}", err),
55            Self::RemoteForward => write!(f, "forwarding error"),
56        }
57    }
58}
59
60impl Error for SendError {}
61
62impl<T> TryFrom<rch::broadcast::SendError<T>> for SendError {
63    type Error = rch::broadcast::SendError<T>;
64
65    fn try_from(err: rch::broadcast::SendError<T>) -> Result<Self, Self::Error> {
66        match err {
67            rch::broadcast::SendError::RemoteSend(err) => Ok(Self::RemoteSend(err)),
68            rch::broadcast::SendError::RemoteConnect(err) => Ok(Self::RemoteConnect(err)),
69            rch::broadcast::SendError::RemoteListen(err) => Ok(Self::RemoteListen(err)),
70            rch::broadcast::SendError::RemoteForward => Ok(Self::RemoteForward),
71            other @ rch::broadcast::SendError::Closed(_) => Err(other),
72        }
73    }
74}
75
76impl<T> TryFrom<rch::mpsc::SendError<T>> for SendError {
77    type Error = rch::mpsc::SendError<T>;
78
79    fn try_from(err: rch::mpsc::SendError<T>) -> Result<Self, Self::Error> {
80        match err {
81            rch::mpsc::SendError::RemoteSend(err) => Ok(Self::RemoteSend(err)),
82            rch::mpsc::SendError::RemoteConnect(err) => Ok(Self::RemoteConnect(err)),
83            rch::mpsc::SendError::RemoteListen(err) => Ok(Self::RemoteListen(err)),
84            rch::mpsc::SendError::RemoteForward => Ok(Self::RemoteForward),
85            other @ rch::mpsc::SendError::Closed(_) => Err(other),
86        }
87    }
88}
89
90/// An error occurred during receiving an event or initial value of an observed collection.
91#[derive(Clone, Debug, Serialize, Deserialize)]
92pub enum RecvError {
93    /// The observed collection was dropped before `done` was called on it.
94    Closed,
95    /// The receiver lagged behind, so that the the send buffer size has reached its limit.
96    ///
97    /// Try increasing the send buffer specified when calling `subscribe` on the
98    /// observed collection.
99    Lagged,
100    /// The maximum size of the mirrored collection has been reached.
101    MaxSizeExceeded(usize),
102    /// Receiving from a remote endpoint failed.
103    RemoteReceive(rch::base::RecvError),
104    /// Connecting a sent channel failed.
105    RemoteConnect(chmux::ConnectError),
106    /// Listening for a connection from a received channel failed.
107    RemoteListen(chmux::ListenerError),
108    /// Invalid index.
109    InvalidIndex(usize),
110}
111
112impl fmt::Display for RecvError {
113    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
114        match self {
115            Self::Closed => write!(f, "observed collection was dropped"),
116            Self::Lagged => write!(f, "observation lagged behind"),
117            Self::MaxSizeExceeded(size) => write!(f, "mirrored collection reached it maximum size of {}", size),
118            Self::RemoteReceive(err) => write!(f, "receive error: {}", err),
119            Self::RemoteConnect(err) => write!(f, "connect error: {}", err),
120            Self::RemoteListen(err) => write!(f, "listen error: {}", err),
121            Self::InvalidIndex(idx) => write!(f, "index {idx} is invalid"),
122        }
123    }
124}
125
126impl Error for RecvError {}
127
128impl From<rch::broadcast::RecvError> for RecvError {
129    fn from(err: rch::broadcast::RecvError) -> Self {
130        match err {
131            rch::broadcast::RecvError::Closed => Self::Closed,
132            rch::broadcast::RecvError::Lagged => Self::Lagged,
133            rch::broadcast::RecvError::RemoteReceive(err) => Self::RemoteReceive(err),
134            rch::broadcast::RecvError::RemoteConnect(err) => Self::RemoteConnect(err),
135            rch::broadcast::RecvError::RemoteListen(err) => Self::RemoteListen(err),
136        }
137    }
138}
139
140impl From<rch::mpsc::RecvError> for RecvError {
141    fn from(err: rch::mpsc::RecvError) -> Self {
142        match err {
143            rch::mpsc::RecvError::RemoteReceive(err) => Self::RemoteReceive(err),
144            rch::mpsc::RecvError::RemoteConnect(err) => Self::RemoteConnect(err),
145            rch::mpsc::RecvError::RemoteListen(err) => Self::RemoteListen(err),
146        }
147    }
148}
149
150/// Sends an event.
151pub(crate) fn send_event<E, Codec>(tx: &rch::broadcast::Sender<E, Codec>, on_err: &dyn Fn(SendError), event: E)
152where
153    Codec: remoc::codec::Codec,
154    E: RemoteSend + Clone,
155{
156    match tx.send(event) {
157        Ok(()) => (),
158        Err(err) if err.is_disconnected() => (),
159        Err(err) => match err.try_into() {
160            Ok(err) => (on_err)(err),
161            Err(_) => unreachable!(),
162        },
163    }
164}
165
166/// Default handler for sending errors.
167pub(crate) fn default_on_err(err: SendError) {
168    tracing::warn!("sending failed: {}", err);
169}
170
171/// The observed object has been dropped.
172#[derive(Clone, Debug, Serialize, Deserialize)]
173pub struct DroppedError;
174
175impl fmt::Display for DroppedError {
176    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
177        write!(f, "dropped")
178    }
179}
180
181impl Error for DroppedError {}
182
183/// Sends change notifications.
184pub(crate) struct ChangeSender {
185    tx: watch::Sender<()>,
186    rx: watch::Receiver<()>,
187}
188
189impl ChangeSender {
190    /// Create a new instance.
191    pub fn new() -> Self {
192        let (tx, rx) = watch::channel(());
193        Self { tx, rx }
194    }
195
196    /// Return a subscribed [ChangeNotifier].
197    pub fn subscribe(&self) -> ChangeNotifier {
198        ChangeNotifier(self.rx.clone())
199    }
200
201    /// Notify all subscribed [ChangeNotifier]s.
202    pub fn notify(&self) {
203        self.tx.send_replace(());
204    }
205}
206
207/// Notifies a local observer of changes to an observable collection.
208///
209/// This can be cloned, but not sent to remote endpoints.
210#[derive(Clone)]
211pub struct ChangeNotifier(watch::Receiver<()>);
212
213impl fmt::Debug for ChangeNotifier {
214    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
215        f.debug_tuple("ChangeNotifier").finish()
216    }
217}
218
219impl ChangeNotifier {
220    /// Returns when the collection has been changed and marks the
221    /// newest value as seen.
222    pub async fn changed(&mut self) -> Result<(), DroppedError> {
223        self.0.changed().await.map_err(|_| DroppedError)
224    }
225
226    /// Marks the current value as seen, so that [changed](Self::changed)
227    /// will not return immediately.
228    pub fn update(&mut self) {
229        self.0.borrow_and_update();
230    }
231}