rill_protocol/io/
provider.rs

1use crate::io::codec::BinaryCodec;
2use crate::io::transport::{DirectId, Envelope, Origin, WideEnvelope};
3use derive_more::{AsMut, AsRef, Deref, DerefMut, From, FromStr, Index, Into};
4use meio_protocol::Protocol;
5use serde::{de, Deserialize, Deserializer, Serialize};
6use std::borrow::Borrow;
7use std::convert::TryInto;
8use std::fmt;
9use std::iter::FromIterator;
10use std::str::FromStr;
11use std::time::Duration;
12use thiserror::Error;
13
14pub type ProviderReqId = DirectId<ProviderProtocol>;
15
16/// An identifier in a hierarchy of the node/metadata/stream.
17#[derive(
18    Serialize,
19    Deserialize,
20    FromStr,
21    Debug,
22    Clone,
23    PartialEq,
24    Eq,
25    PartialOrd,
26    Ord,
27    Hash,
28    Default,
29    From,
30    Into,
31)]
32#[serde(transparent)]
33pub struct EntryId(String);
34
35impl AsRef<str> for EntryId {
36    fn as_ref(&self) -> &str {
37        &self.0
38    }
39}
40
41impl Borrow<str> for EntryId {
42    fn borrow(&self) -> &str {
43        &self.0
44    }
45}
46
47impl From<&str> for EntryId {
48    fn from(value: &str) -> Self {
49        Self(value.to_string())
50    }
51}
52
53impl fmt::Display for EntryId {
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        fmt::Display::fmt(&self.0, f)
56    }
57}
58
59#[derive(Debug, Clone, PartialEq, Eq, Hash)]
60pub struct PathPattern {
61    pub path: Path,
62}
63
64impl<'de> Deserialize<'de> for PathPattern {
65    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
66    where
67        D: Deserializer<'de>,
68    {
69        let s = String::deserialize(deserializer)?;
70        let path: Path = FromStr::from_str(&s).map_err(de::Error::custom)?;
71        Ok(PathPattern { path })
72    }
73}
74
75impl From<PathPattern> for Path {
76    fn from(pattern: PathPattern) -> Path {
77        pattern.path
78    }
79}
80
81#[derive(
82    Debug,
83    Clone,
84    AsRef,
85    AsMut,
86    Deref,
87    DerefMut,
88    From,
89    Into,
90    Index,
91    Serialize,
92    Deserialize,
93    PartialEq,
94    Eq,
95    PartialOrd,
96    Ord,
97    Hash,
98)]
99// TODO: Consider to use `type Path = Vec<EntryId>` directly
100pub struct Path(Vec<EntryId>);
101
102impl Path {
103    pub fn single(entry_id: impl Into<EntryId>) -> Self {
104        Self(vec![entry_id.into()])
105    }
106
107    pub fn is_meta(&self) -> bool {
108        self.0
109            .iter()
110            .any(|entry_id| entry_id.as_ref().contains("meta:"))
111    }
112
113    pub fn is_hidden(&self) -> bool {
114        self.0
115            .get(0)
116            .map(|entry_id| entry_id.as_ref().starts_with('@'))
117            .unwrap_or_default()
118    }
119
120    /*
121    pub fn root() -> Self {
122        Self(Vec::new())
123    }
124
125    pub fn add_root(&self, entry_id: &EntryId) -> Path {
126        std::iter::once(entry_id.clone())
127            .chain(self.0.iter().cloned())
128            .collect::<Vec<_>>()
129            .into()
130    }
131
132    pub fn concat(&self, entry_id: impl Into<EntryId>) -> Path {
133        let mut cloned = self.clone();
134        cloned.0.push(entry_id.into());
135        cloned
136    }
137    */
138
139    /*
140    pub fn concat(&self, other: &[EntryId]) -> Path {
141        self.0
142            .iter()
143            .chain(other.iter())
144            .cloned()
145            .collect::<Vec<_>>()
146            .into()
147    }
148
149    #[deprecated(since = "0.4.0", note = "Use `split` method instead.")]
150    pub fn subpath(&self, drop_left: usize) -> Path {
151        self.0[drop_left..].to_vec().into()
152    }
153    */
154
155    pub fn split(&self) -> (Option<EntryId>, Path) {
156        let mut iter = self.0.iter().cloned();
157        let entry_id = iter.next();
158        let path = Path::from(iter.collect::<Vec<_>>());
159        (entry_id, path)
160    }
161}
162
163impl<'a> FromIterator<&'a EntryId> for Path {
164    fn from_iter<I: IntoIterator<Item = &'a EntryId>>(iter: I) -> Self {
165        Self(iter.into_iter().cloned().collect())
166    }
167}
168
169impl Extend<EntryId> for Path {
170    fn extend<T>(&mut self, iter: T)
171    where
172        T: IntoIterator<Item = EntryId>,
173    {
174        self.0.extend(iter);
175    }
176}
177
178impl IntoIterator for Path {
179    type Item = EntryId;
180    type IntoIter = std::vec::IntoIter<EntryId>;
181
182    fn into_iter(self) -> Self::IntoIter {
183        self.0.into_iter()
184    }
185}
186
187impl Path {
188    pub fn of_server(self) -> Self {
189        let mut server = vec!["@server".into()];
190        server.extend(self);
191        server.into()
192    }
193
194    pub fn of_client(self) -> Self {
195        let mut server = vec!["@self".into()];
196        server.extend(self);
197        server.into()
198    }
199}
200
201/*
202impl AsRef<[EntryId]> for Path {
203    fn as_ref(&self) -> &[EntryId] {
204        &self.0
205    }
206}
207*/
208
209impl fmt::Display for Path {
210    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211        let mut prefix = false;
212        for entry in self.0.iter() {
213            if prefix {
214                ".".fmt(f)?;
215            } else {
216                prefix = true;
217            }
218            entry.fmt(f)?;
219        }
220        Ok(())
221    }
222}
223
224impl From<EntryId> for Path {
225    fn from(entry_id: EntryId) -> Self {
226        Self(vec![entry_id])
227    }
228}
229
230#[derive(Error, Debug)]
231pub enum PathError {
232    // Never constructed yet, because paths never fail now.
233}
234
235impl FromStr for Path {
236    type Err = PathError;
237
238    fn from_str(s: &str) -> Result<Self, Self::Err> {
239        let entries: Vec<_> = s.split('.').map(EntryId::from).collect();
240        Ok(Path::from(entries))
241    }
242}
243
244// `i64` used, becuase it's widely supported as UTC timestamp
245// and for example it's used as timestamp value in BSON format.
246#[derive(
247    Serialize,
248    Deserialize,
249    From,
250    Into,
251    Default,
252    Debug,
253    Clone,
254    Copy,
255    PartialEq,
256    Eq,
257    PartialOrd,
258    Ord,
259    Hash,
260)]
261pub struct Timestamp(pub i64);
262
263impl From<Duration> for Timestamp {
264    fn from(duration: Duration) -> Self {
265        // TODO: Use `try_into` here?
266        Self(duration.as_millis() as i64)
267    }
268}
269
270// TODO: Change to `Into` when possible.
271// When `from_millis(i64)` will be supported.
272impl TryInto<Duration> for Timestamp {
273    type Error = std::num::TryFromIntError;
274
275    fn try_into(self) -> Result<Duration, Self::Error> {
276        self.0.try_into().map(Duration::from_millis)
277    }
278}
279
280impl Timestamp {
281    // TODO: Maybe just impl `ToPrimitive`?
282    pub fn to_f64(self) -> f64 {
283        self.0 as f64
284    }
285
286    pub fn as_secs(&self) -> i64 {
287        self.0 / 1_000
288    }
289
290    pub fn as_millis(&self) -> i64 {
291        self.0
292    }
293}
294
295// TODO: Rename to `ProviderProtocol`
296#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
297pub struct ProviderProtocol;
298
299impl Protocol for ProviderProtocol {
300    type ToServer = WideEnvelope<Self, ProviderToServer>;
301    type ToClient = Envelope<Self, ServerToProvider>;
302    type Codec = BinaryCodec;
303}
304
305impl Origin for ProviderProtocol {}
306
307/* ? TODO: Remove
308pub type ServerRequest = Envelope<ProviderProtocol, ServerToProvider>;
309
310pub type ProviderResponse = WideEnvelope<ProviderProtocol, ProviderToServer>;
311*/
312
313#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct ServerToProvider {
315    pub path: Path,
316    pub request: RecorderRequest,
317}
318
319#[derive(Debug, Clone, Serialize, Deserialize)]
320pub enum RecorderRequest {
321    Action(RecorderAction),
322    ControlStream(FlowControl),
323}
324
325#[derive(Debug, Clone, Serialize, Deserialize)]
326pub enum RecorderAction {
327    GetFlow,
328    GetSnapshot,
329    DoAction(PackedAction),
330}
331
332#[derive(Debug, Clone, Serialize, Deserialize)]
333pub enum FlowControl {
334    StartStream,
335    StopStream,
336}
337
338#[derive(Debug, Clone, From, Into, Serialize, Deserialize, PartialEq, Eq, Hash)]
339pub struct StreamType(String);
340
341impl fmt::Display for StreamType {
342    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343        self.0.fmt(f)
344    }
345}
346
347impl From<&str> for StreamType {
348    fn from(name: &str) -> Self {
349        Self(name.into())
350    }
351}
352
353// TODO: Consider removing and use `domain-scope` descriptions:
354// - for packs
355// - for internal server things
356// - for server ui controls
357// - etc.
358#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
359pub struct Description {
360    pub path: Path,
361    pub stream_type: StreamType,
362}
363
364macro_rules! packed {
365    ($name:ident) => {
366        #[derive(Clone, From, Into, Serialize, Deserialize, PartialEq, Eq)]
367        pub struct $name(pub Vec<u8>);
368
369        impl fmt::Debug for $name {
370            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
371                f.debug_struct(stringify!($name))
372                    .field("size", &self.0.len())
373                    .finish()
374            }
375        }
376
377        impl AsRef<[u8]> for $name {
378            fn as_ref(&self) -> &[u8] {
379                self.0.as_ref()
380            }
381        }
382    };
383}
384
385packed!(PackedState);
386packed!(PackedEvent);
387packed!(PackedAction);
388
389#[derive(Debug, Clone, Serialize, Deserialize)]
390pub enum ProviderToServer {
391    Declare {
392        description: Description,
393    },
394    /// The response to `ControlStream { active: true }` request
395    Flow {
396        description: Description,
397    },
398    State {
399        state: PackedState,
400    },
401    Data {
402        /// Aggregated events.
403        delta: PackedEvent,
404    },
405    EndStream,
406    Error {
407        reason: String,
408    },
409}