1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use async_net::SocketAddr;
use async_net::UdpSocket;
use std::net::ToSocketAddrs;
use crate::runtime::dev::prelude::*;
/// Push [Blobs](crate::runtime::Pmt::Blob) into a UDP socket.
///
/// # Message Inputs
///
/// `in`: `Pmt::Blob` payloads to send. `Pmt::Finished` terminates the block.
///
/// # Message Outputs
///
/// No message outputs.
///
/// # Usage
/// ```no_run
/// use futuresdr::blocks::BlobToUdp;
///
/// let udp = BlobToUdp::new("127.0.0.1:2342");
/// ```
#[derive(Block)]
#[message_inputs(r#in)]
pub struct BlobToUdp {
socket: Option<UdpSocket>,
remote: SocketAddr,
}
impl BlobToUdp {
/// Create [`BlobToUdp`] block
///
/// ## Parameter
/// - `remote`: UDP socket address, e.g., `localhost:2342`
pub fn new<S>(remote: S) -> Self
where
S: AsRef<str>,
{
BlobToUdp {
socket: None,
remote: remote
.as_ref()
.to_socket_addrs()
.expect("could not resolve socket address")
.next()
.unwrap(),
}
}
async fn r#in(
&mut self,
io: &mut WorkIo,
_mo: &mut MessageOutputs,
_meta: &mut BlockMeta,
p: Pmt,
) -> Result<Pmt> {
match p {
Pmt::Blob(v) => match self.socket.as_ref().unwrap().send_to(&v, self.remote).await {
Ok(s) => {
assert_eq!(s, v.len());
}
Err(e) => {
println!("udp error: {e:?}");
return Err(e.into());
}
},
Pmt::Finished => {
io.finished = true;
}
_ => {
warn!("BlockToUdp: received wrong PMT type. {:?}", p);
}
}
Ok(Pmt::Null)
}
}
#[doc(hidden)]
impl Kernel for BlobToUdp {
async fn init(&mut self, _mo: &mut MessageOutputs, _b: &mut BlockMeta) -> Result<()> {
let socket = UdpSocket::bind("127.0.0.1:0").await?;
self.socket = Some(socket);
Ok(())
}
}