1use crate::io::codec::BinaryCodec;
2use crate::io::transport::{DirectId, Envelope, Origin, WideEnvelope};
3use derive_more::{AsMut, AsRef, Deref, DerefMut, From, FromStr, Index, Into};
4use meio_protocol::Protocol;
5use serde::{de, Deserialize, Deserializer, Serialize};
6use std::borrow::Borrow;
7use std::convert::TryInto;
8use std::fmt;
9use std::iter::FromIterator;
10use std::str::FromStr;
11use std::time::Duration;
12use thiserror::Error;
13
14pub type ProviderReqId = DirectId<ProviderProtocol>;
15
16#[derive(
18 Serialize,
19 Deserialize,
20 FromStr,
21 Debug,
22 Clone,
23 PartialEq,
24 Eq,
25 PartialOrd,
26 Ord,
27 Hash,
28 Default,
29 From,
30 Into,
31)]
32#[serde(transparent)]
33pub struct EntryId(String);
34
35impl AsRef<str> for EntryId {
36 fn as_ref(&self) -> &str {
37 &self.0
38 }
39}
40
41impl Borrow<str> for EntryId {
42 fn borrow(&self) -> &str {
43 &self.0
44 }
45}
46
47impl From<&str> for EntryId {
48 fn from(value: &str) -> Self {
49 Self(value.to_string())
50 }
51}
52
53impl fmt::Display for EntryId {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 fmt::Display::fmt(&self.0, f)
56 }
57}
58
59#[derive(Debug, Clone, PartialEq, Eq, Hash)]
60pub struct PathPattern {
61 pub path: Path,
62}
63
64impl<'de> Deserialize<'de> for PathPattern {
65 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
66 where
67 D: Deserializer<'de>,
68 {
69 let s = String::deserialize(deserializer)?;
70 let path: Path = FromStr::from_str(&s).map_err(de::Error::custom)?;
71 Ok(PathPattern { path })
72 }
73}
74
75impl From<PathPattern> for Path {
76 fn from(pattern: PathPattern) -> Path {
77 pattern.path
78 }
79}
80
81#[derive(
82 Debug,
83 Clone,
84 AsRef,
85 AsMut,
86 Deref,
87 DerefMut,
88 From,
89 Into,
90 Index,
91 Serialize,
92 Deserialize,
93 PartialEq,
94 Eq,
95 PartialOrd,
96 Ord,
97 Hash,
98)]
99pub struct Path(Vec<EntryId>);
101
102impl Path {
103 pub fn single(entry_id: impl Into<EntryId>) -> Self {
104 Self(vec![entry_id.into()])
105 }
106
107 pub fn is_meta(&self) -> bool {
108 self.0
109 .iter()
110 .any(|entry_id| entry_id.as_ref().contains("meta:"))
111 }
112
113 pub fn is_hidden(&self) -> bool {
114 self.0
115 .get(0)
116 .map(|entry_id| entry_id.as_ref().starts_with('@'))
117 .unwrap_or_default()
118 }
119
120 pub fn split(&self) -> (Option<EntryId>, Path) {
156 let mut iter = self.0.iter().cloned();
157 let entry_id = iter.next();
158 let path = Path::from(iter.collect::<Vec<_>>());
159 (entry_id, path)
160 }
161}
162
163impl<'a> FromIterator<&'a EntryId> for Path {
164 fn from_iter<I: IntoIterator<Item = &'a EntryId>>(iter: I) -> Self {
165 Self(iter.into_iter().cloned().collect())
166 }
167}
168
169impl Extend<EntryId> for Path {
170 fn extend<T>(&mut self, iter: T)
171 where
172 T: IntoIterator<Item = EntryId>,
173 {
174 self.0.extend(iter);
175 }
176}
177
178impl IntoIterator for Path {
179 type Item = EntryId;
180 type IntoIter = std::vec::IntoIter<EntryId>;
181
182 fn into_iter(self) -> Self::IntoIter {
183 self.0.into_iter()
184 }
185}
186
187impl Path {
188 pub fn of_server(self) -> Self {
189 let mut server = vec!["@server".into()];
190 server.extend(self);
191 server.into()
192 }
193
194 pub fn of_client(self) -> Self {
195 let mut server = vec!["@self".into()];
196 server.extend(self);
197 server.into()
198 }
199}
200
201impl fmt::Display for Path {
210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 let mut prefix = false;
212 for entry in self.0.iter() {
213 if prefix {
214 ".".fmt(f)?;
215 } else {
216 prefix = true;
217 }
218 entry.fmt(f)?;
219 }
220 Ok(())
221 }
222}
223
224impl From<EntryId> for Path {
225 fn from(entry_id: EntryId) -> Self {
226 Self(vec![entry_id])
227 }
228}
229
230#[derive(Error, Debug)]
231pub enum PathError {
232 }
234
235impl FromStr for Path {
236 type Err = PathError;
237
238 fn from_str(s: &str) -> Result<Self, Self::Err> {
239 let entries: Vec<_> = s.split('.').map(EntryId::from).collect();
240 Ok(Path::from(entries))
241 }
242}
243
244#[derive(
247 Serialize,
248 Deserialize,
249 From,
250 Into,
251 Default,
252 Debug,
253 Clone,
254 Copy,
255 PartialEq,
256 Eq,
257 PartialOrd,
258 Ord,
259 Hash,
260)]
261pub struct Timestamp(pub i64);
262
263impl From<Duration> for Timestamp {
264 fn from(duration: Duration) -> Self {
265 Self(duration.as_millis() as i64)
267 }
268}
269
270impl TryInto<Duration> for Timestamp {
273 type Error = std::num::TryFromIntError;
274
275 fn try_into(self) -> Result<Duration, Self::Error> {
276 self.0.try_into().map(Duration::from_millis)
277 }
278}
279
280impl Timestamp {
281 pub fn to_f64(self) -> f64 {
283 self.0 as f64
284 }
285
286 pub fn as_secs(&self) -> i64 {
287 self.0 / 1_000
288 }
289
290 pub fn as_millis(&self) -> i64 {
291 self.0
292 }
293}
294
295#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
297pub struct ProviderProtocol;
298
299impl Protocol for ProviderProtocol {
300 type ToServer = WideEnvelope<Self, ProviderToServer>;
301 type ToClient = Envelope<Self, ServerToProvider>;
302 type Codec = BinaryCodec;
303}
304
305impl Origin for ProviderProtocol {}
306
307#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct ServerToProvider {
315 pub path: Path,
316 pub request: RecorderRequest,
317}
318
319#[derive(Debug, Clone, Serialize, Deserialize)]
320pub enum RecorderRequest {
321 Action(RecorderAction),
322 ControlStream(FlowControl),
323}
324
325#[derive(Debug, Clone, Serialize, Deserialize)]
326pub enum RecorderAction {
327 GetFlow,
328 GetSnapshot,
329 DoAction(PackedAction),
330}
331
332#[derive(Debug, Clone, Serialize, Deserialize)]
333pub enum FlowControl {
334 StartStream,
335 StopStream,
336}
337
338#[derive(Debug, Clone, From, Into, Serialize, Deserialize, PartialEq, Eq, Hash)]
339pub struct StreamType(String);
340
341impl fmt::Display for StreamType {
342 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343 self.0.fmt(f)
344 }
345}
346
347impl From<&str> for StreamType {
348 fn from(name: &str) -> Self {
349 Self(name.into())
350 }
351}
352
353#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
359pub struct Description {
360 pub path: Path,
361 pub stream_type: StreamType,
362}
363
364macro_rules! packed {
365 ($name:ident) => {
366 #[derive(Clone, From, Into, Serialize, Deserialize, PartialEq, Eq)]
367 pub struct $name(pub Vec<u8>);
368
369 impl fmt::Debug for $name {
370 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
371 f.debug_struct(stringify!($name))
372 .field("size", &self.0.len())
373 .finish()
374 }
375 }
376
377 impl AsRef<[u8]> for $name {
378 fn as_ref(&self) -> &[u8] {
379 self.0.as_ref()
380 }
381 }
382 };
383}
384
385packed!(PackedState);
386packed!(PackedEvent);
387packed!(PackedAction);
388
389#[derive(Debug, Clone, Serialize, Deserialize)]
390pub enum ProviderToServer {
391 Declare {
392 description: Description,
393 },
394 Flow {
396 description: Description,
397 },
398 State {
399 state: PackedState,
400 },
401 Data {
402 delta: PackedEvent,
404 },
405 EndStream,
406 Error {
407 reason: String,
408 },
409}