1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
use std::time::{Duration, SystemTime};
use crate::core::utils::Sealed;
use crate::error::{RecvResult, RecvTimeoutError, RecvTimeoutResult, TryRecvError, TryRecvResult};
use crate::prelude::*;
use crate::sync::prelude::*;
/// <sup>🔒</sup>
/// Synchronous API for receiving node events.
///
/// 🔒 This trait is sealed 🔒
pub trait ReceiveEvent<V: MaybeVersioned>: Sealed {
/// <sup>[`sync`](crate::sync)</sup>
/// Receives the next node [`Event`].
///
/// Blocks until event received.
fn recv(&self) -> RecvResult<Event<V>>;
/// <sup>[`sync`](crate::sync)</sup>
/// Attempts to receive the next node [`Event`] within a `timeout`.
///
/// Blocks until event received or deadline is reached.
fn recv_timeout(&self, timeout: Duration) -> RecvTimeoutResult<Event<V>>;
/// <sup>[`sync`](crate::sync)</sup>
/// Attempts to receive MAVLink [`Event`] without blocking.
fn try_recv(&self) -> TryRecvResult<Event<V>>;
/// <sup>[`sync`](crate::sync)</sup>
/// Subscribes to node events.
///
/// Blocks while the underlying node is active.
///
/// If you are interested only in valid incoming frames, use [`frames`], [`recv_frame`],
/// [`recv_frame_timeout`], or [`try_recv_frame`] instead.
///
/// [`recv_frame`]: ReceiveFrame::recv_frame
/// [`recv_frame_timeout`]: ReceiveFrame::recv_frame_timeout
/// [`try_recv_frame`]: ReceiveFrame::try_recv_frame
/// [`frames`]: ReceiveFrame::frames
fn events(&self) -> impl Iterator<Item = Event<V>>;
}
/// <sup>🔒</sup>
/// Synchronous API for receiving valid MAVLink frames.
///
/// 🔒 This trait is sealed 🔒
pub trait ReceiveFrame<V: MaybeVersioned>: ReceiveEvent<V> {
/// <sup>[`sync`](crate::sync)</sup>
/// Receives the next frame. Blocks until valid frame received or channel is closed.
///
/// If you want to block until the next frame within a timeout, use [`recv_frame_timeout`].
/// If you want to check for the next frame without blocking, use [`try_recv_frame`].
///
/// **âš ** This method skips all invalid frames. If you are interested in such frames, use
/// [`events`] or [`recv`] instead to receive [`Event::Invalid`] event that contain invalid
/// frame with the corresponding error.
///
/// [`recv_frame_timeout`]: Self::recv_frame_timeout
/// [`try_recv_frame`]: Self::try_recv_frame
/// [`events`]: ReceiveEvent::events
/// [`recv`]: ReceiveEvent::recv
fn recv_frame(&self) -> RecvResult<(Frame<V>, Callback<V>)> {
loop {
match self.recv() {
Ok(Event::Frame(frame, callback)) => {
return Ok((frame, callback));
}
Ok(_) => continue,
Err(err) => return Err(err),
}
}
}
/// <sup>[`sync`](crate::sync)</sup>
/// Attempts ot receives the next frame until the timeout is reached. Blocks until valid frame
/// received, deadline is reached, or channel is closed.
///
/// If you want to block until the next frame is received, use [`recv_frame`].
/// If you want to check for the next frame without blocking, use [`try_recv_frame`].
///
/// **âš ** This method skips all invalid frames. If you are interested in such frames, use
/// [`events`] or [`recv_timeout`] instead to receive [`Event::Invalid`] event that contains
/// invalid frame with the corresponding error.
///
/// [`recv_frame`]: Self::recv_frame
/// [`try_recv_frame`]: Self::try_recv_frame
/// [`events`]: ReceiveEvent::events
/// [`recv_timeout`]: ReceiveEvent::recv_timeout
fn recv_frame_timeout(&self, timeout: Duration) -> RecvTimeoutResult<(Frame<V>, Callback<V>)> {
let start = SystemTime::now();
let mut current_timeout = timeout;
loop {
match self.recv_timeout(current_timeout) {
Ok(Event::Frame(frame, callback)) => {
return Ok((frame, callback));
}
Ok(_) => {
let since_start =
if let Ok(since_start) = SystemTime::now().duration_since(start) {
since_start
} else {
continue;
};
if let Some(new_timeout) = timeout.checked_sub(since_start) {
current_timeout = new_timeout;
} else {
return Err(RecvTimeoutError::Timeout);
}
}
Err(err) => return Err(err),
}
}
}
/// <sup>[`sync`](crate::sync)</sup>
/// Attempts to receive the next valid frame. Returns immediately if channel is empty.
///
/// If you want to block until the next frame within a timeout, use [`recv_frame_timeout`].
/// If you want to block until the next frame is received, use [`recv_frame`].
///
/// **âš ** This method skips all invalid frames. If you are interested in such frames, use
/// [`events`] or [`try_recv`] instead to receive [`Event::Invalid`] event that contains invalid
/// frame with the corresponding error.
///
/// [`recv_frame`]: Self::recv_frame
/// [`recv_frame_timeout`]: Self::recv_frame_timeout
/// [`events`]: ReceiveEvent::events
/// [`try_recv`]: ReceiveEvent::try_recv
fn try_recv_frame(&self) -> TryRecvResult<(Frame<V>, Callback<V>)> {
match self.try_recv() {
Ok(Event::Frame(frame, callback)) => Ok((frame, callback)),
Ok(_) => Err(TryRecvError::Empty),
Err(err) => Err(err),
}
}
/// <sup>[`sync`](crate::sync)</sup>
/// Subscribes to valid MAVLink frames.
///
/// Blocks while the underlying node is active.
///
/// **âš ** This method skips all invalid frames. If you are interested in such frames, use
/// [`events`], [`recv`], [`recv_timeout`], or [`try_recv`] instead to receive
/// [`Event::Invalid`] event that contains invalid frame with the corresponding error.
///
/// [`recv`]: ReceiveEvent::recv
/// [`recv_timeout`]: ReceiveEvent::recv_timeout
/// [`try_recv`]: ReceiveEvent::try_recv
/// [`events`]: ReceiveEvent::events
fn frames(&self) -> impl Iterator<Item = (Frame<V>, Callback<V>)> {
self.events().filter_map(|event| match event {
Event::Frame(frame, callback) => Some((frame, callback)),
_ => None,
})
}
}