rnacos/naming/
udp_actor.rs1#![allow(unused_imports, unused_must_use)]
2use actix::prelude::*;
3use std::borrow::Cow;
4use std::env;
5use std::error::Error;
6use std::io::stdin;
7use std::net::SocketAddr;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::net::UdpSocket;
11use tokio::signal;
12use tokio::sync::Mutex;
13
14use super::listener::{InnerNamingListener, NamingListenerCmd};
15
16const MAX_DATAGRAM_SIZE: usize = 65_507;
17pub struct UdpWorker {
18 local_addr_str: Option<String>,
19 socket: Option<Arc<UdpSocket>>,
20 addr: Option<Addr<InnerNamingListener>>,
21 udp_port: u16,
22 buf: Option<Vec<u8>>,
23}
24
25impl UdpWorker {
26 pub fn new(addr: Option<Addr<InnerNamingListener>>) -> Self {
27 Self {
28 local_addr_str: None,
29 socket: None,
30 addr,
31 udp_port: 0,
32 buf: Some(vec![]),
33 }
34 }
35
36 pub fn new_with_socket(socket: UdpSocket, addr: Option<Addr<InnerNamingListener>>) -> Self {
37 let local_addr = socket.local_addr().unwrap();
38 let udp_port = local_addr.port();
39 Self {
40 local_addr_str: None,
41 socket: Some(Arc::new(socket)),
42 addr,
43 udp_port,
44 buf: Some(vec![]),
45 }
46 }
47
48 fn init(&mut self, ctx: &mut actix::Context<Self>) {
49 self.init_socket(ctx);
50 }
52
53 fn init_socket(&mut self, ctx: &mut actix::Context<Self>) {
54 if self.socket.is_some() {
55 self.init_loop_recv(ctx);
56 return;
57 }
58 let local_addr_str = if let Some(addr) = self.local_addr_str.as_ref() {
59 addr.to_owned()
60 } else {
61 "0.0.0.0:0".to_owned()
62 };
63 async move { UdpSocket::bind(&local_addr_str).await.unwrap() }
64 .into_actor(self)
65 .map(|r, act, ctx| {
66 act.udp_port = r.local_addr().unwrap().port();
67 act.socket = Some(Arc::new(r));
68 act.init_loop_recv(ctx);
69 })
70 .wait(ctx);
71 }
72
73 fn init_loop_recv(&mut self, ctx: &mut actix::Context<Self>) {
74 let socket = self.socket.as_ref().unwrap().clone();
75 let notify_addr = self.addr.clone();
76 let buf = self.buf.replace(Vec::new());
77 async move {
78 let mut buf = buf.unwrap_or_default();
79 if buf.len() < MAX_DATAGRAM_SIZE {
80 buf = vec![0u8; MAX_DATAGRAM_SIZE];
81 }
82 while let Ok((_len, addr)) = socket.recv_from(&mut buf).await {
83 let msg = NamingListenerCmd::Response(addr.to_owned());
87 if let Some(_notify_addr) = ¬ify_addr {
90 _notify_addr.do_send(msg);
91 }
92 }
93 buf
94 }
95 .into_actor(self)
96 .map(|buf, act, ctx| {
97 act.buf.replace(buf);
98 ctx.run_later(Duration::from_secs(1), |act, ctx| {
99 act.init_loop_recv(ctx);
100 });
101 })
102 .spawn(ctx);
103 }
104}
105
106impl Actor for UdpWorker {
107 type Context = Context<Self>;
108
109 fn started(&mut self, ctx: &mut Self::Context) {
110 log::info!(" UdpWorker started");
111 self.init(ctx);
112 }
113}
114
115#[derive(Debug, Message)]
116#[rtype(result = "Result<(),std::io::Error>")]
117pub struct UdpSenderCmd {
118 pub data: Arc<Vec<u8>>,
119 pub target_addr: SocketAddr,
120}
121
122impl UdpSenderCmd {
123 pub fn new(data: Arc<Vec<u8>>, addr: SocketAddr) -> Self {
124 Self {
125 data,
126 target_addr: addr,
127 }
128 }
129}
130
131impl Handler<UdpSenderCmd> for UdpWorker {
132 type Result = Result<(), std::io::Error>;
133 fn handle(&mut self, msg: UdpSenderCmd, ctx: &mut Context<Self>) -> Self::Result {
134 log::info!("send instance info by udp,to addr:{}", &msg.target_addr);
135 let socket = self.socket.as_ref().unwrap().clone();
136 async move {
137 socket.send_to(&msg.data, msg.target_addr).await;
138 }
139 .into_actor(self)
140 .map(|_, _, _| {})
141 .spawn(ctx);
142 Ok(())
143 }
144}
145
146#[derive(Message)]
147#[rtype(result = "Result<UdpWorkerResult,std::io::Error>")]
148pub enum UdpWorkerCmd {
149 SetListenerAddr(Addr<InnerNamingListener>),
150 Close,
151}
152
153pub enum UdpWorkerResult {
154 None,
155}
156
157impl Handler<UdpWorkerCmd> for UdpWorker {
158 type Result = Result<UdpWorkerResult, std::io::Error>;
159
160 fn handle(&mut self, msg: UdpWorkerCmd, ctx: &mut Self::Context) -> Self::Result {
161 match msg {
162 UdpWorkerCmd::Close => {
163 log::info!("UdpWorker close");
164 self.addr = None;
165 self.socket = None;
166 ctx.stop();
167 }
168 UdpWorkerCmd::SetListenerAddr(addr) => {
169 self.addr = Some(addr);
170 }
171 };
172 Ok(UdpWorkerResult::None)
173 }
174}