fire_stream/handler/
mod.rs1pub(crate) mod client;
5pub(crate) mod server;
6
7use crate::error::{StreamError, TaskError};
8use crate::util::watch;
9
10use tokio::sync::{oneshot, mpsc};
11use tokio::task::JoinHandle;
12
13pub(crate) enum SendBack<P> {
15 None,
16 Packet(P),
17 Close,
18 CloseWithPacket
19}
20
21pub(crate) struct TaskHandle {
24 pub close: oneshot::Sender<()>,
25 pub task: JoinHandle<Result<(), TaskError>>
26}
27
28impl TaskHandle {
29 pub async fn wait(self) -> Result<(), TaskError> {
32 self.task.await
33 .map_err(TaskError::Join)?
34 }
35
36 pub async fn close(self) -> Result<(), TaskError> {
38 let _ = self.close.send(());
39 self.task.await
40 .map_err(TaskError::Join)?
41 }
42
43 #[cfg(test)]
45 pub fn abort(self) {
46 self.task.abort();
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct StreamSender<P> {
53 pub(crate) inner: mpsc::Sender<P>
54}
55
56impl<P> StreamSender<P> {
57 pub(crate) fn new(inner: mpsc::Sender<P>) -> Self {
58 Self { inner }
59 }
60
61 pub async fn send(&self, packet: P) -> Result<(), StreamError> {
63 self.inner.send(packet).await
64 .map_err(|_| StreamError::StreamAlreadyClosed)
65 }
66}
67
68#[derive(Debug)]
70pub struct StreamReceiver<P> {
71 pub(crate) inner: mpsc::Receiver<P>
72}
73
74impl<P> StreamReceiver<P> {
75 pub(crate) fn new(inner: mpsc::Receiver<P>) -> Self {
76 Self { inner }
77 }
78
79 pub async fn receive(&mut self) -> Option<P> {
82 self.inner.recv().await
83 }
84
85 pub fn close(&mut self) {
88 self.inner.close();
89 }
90}
91
92#[derive(Debug, Clone)]
93pub struct Configurator<C> {
94 inner: watch::Sender<C>
95}
96
97impl<C> Configurator<C> {
98 pub(crate) fn new(inner: watch::Sender<C>) -> Self {
99 Self { inner }
100 }
101
102 pub fn update(&self, cfg: C) {
106 self.inner.send(cfg);
107 }
108
109 pub fn read(&self) -> C
110 where C: Clone {
111 self.inner.newest()
112 }
113}