1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use ether_dream::dac;
use ether_dream::protocol::{DacBroadcast, SizeBytes, WriteBytes, BROADCAST_PORT};
use futures::prelude::*;
use smol::Async;
use std::{io, net, time};
pub struct Broadcaster {
dac: dac::Addressed,
udp_socket: Async<net::UdpSocket>,
broadcast_addr: net::SocketAddrV4,
bytes: [u8; DacBroadcast::SIZE_BYTES],
}
#[derive(Debug)]
pub enum Message {
Dac(dac::Addressed),
Send,
Close,
}
impl Broadcaster {
pub fn new(
dac: dac::Addressed,
bind_port: u16,
broadcast_ip: net::Ipv4Addr,
) -> io::Result<Broadcaster> {
let broadcast_addr = net::SocketAddrV4::new(broadcast_ip, BROADCAST_PORT);
let bind_addr = net::SocketAddrV4::new([0, 0, 0, 0].into(), bind_port);
let udp_socket = Async::<net::UdpSocket>::bind(bind_addr)?;
udp_socket.get_ref().set_broadcast(true)?;
let bytes = [0u8; DacBroadcast::SIZE_BYTES];
Ok(Broadcaster {
dac,
udp_socket,
broadcast_addr,
bytes,
})
}
pub fn create_broadcast(&self) -> DacBroadcast {
let dac_status = self.dac.status.to_protocol();
DacBroadcast {
mac_address: self.dac.mac_address.into(),
hw_revision: self.dac.hw_revision,
sw_revision: self.dac.sw_revision,
buffer_capacity: self.dac.buffer_capacity,
max_point_rate: self.dac.max_point_rate,
dac_status,
}
}
pub async fn send(&mut self) -> io::Result<()> {
{
let dac_broadcast = self.create_broadcast();
let mut writer = &mut self.bytes[..];
writer.write_bytes(&dac_broadcast)?;
}
let addr = self.broadcast_addr.clone();
self.udp_socket.send_to(&self.bytes, addr).await?;
Ok(())
}
pub async fn run<M>(&mut self, mut msgs: M) -> io::Result<()>
where
M: Stream<Item = Message> + Unpin,
{
while let Some(msg) = msgs.next().await {
match msg {
Message::Send => self.send().await?,
Message::Dac(dac) => self.dac = dac,
Message::Close => break,
}
}
Ok(())
}
}
pub fn timer_stream(interval: time::Duration) -> impl Stream<Item = ()> {
let delay_iter = (0..).map(move |_| smol::Timer::after(interval));
let timer = futures::stream::iter(delay_iter);
timer.filter_map(|delay| async move {
delay.await;
Some(())
})
}
pub fn one_hz() -> impl Stream<Item = ()> {
timer_stream(time::Duration::from_secs(1))
}
pub fn one_hz_send() -> impl Stream<Item = Message> {
one_hz().map(|()| Message::Send)
}