rnacos/naming/
udp_actor.rs

1#![allow(unused_imports, unused_must_use)]
2use actix::prelude::*;
3use std::borrow::Cow;
4use std::env;
5use std::error::Error;
6use std::io::stdin;
7use std::net::SocketAddr;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::net::UdpSocket;
11use tokio::signal;
12use tokio::sync::Mutex;
13
14use super::listener::{InnerNamingListener, NamingListenerCmd};
15
16const MAX_DATAGRAM_SIZE: usize = 65_507;
17pub struct UdpWorker {
18    local_addr_str: Option<String>,
19    socket: Option<Arc<UdpSocket>>,
20    addr: Option<Addr<InnerNamingListener>>,
21    udp_port: u16,
22    buf: Option<Vec<u8>>,
23}
24
25impl UdpWorker {
26    pub fn new(addr: Option<Addr<InnerNamingListener>>) -> Self {
27        Self {
28            local_addr_str: None,
29            socket: None,
30            addr,
31            udp_port: 0,
32            buf: Some(vec![]),
33        }
34    }
35
36    pub fn new_with_socket(socket: UdpSocket, addr: Option<Addr<InnerNamingListener>>) -> Self {
37        let local_addr = socket.local_addr().unwrap();
38        let udp_port = local_addr.port();
39        Self {
40            local_addr_str: None,
41            socket: Some(Arc::new(socket)),
42            addr,
43            udp_port,
44            buf: Some(vec![]),
45        }
46    }
47
48    fn init(&mut self, ctx: &mut actix::Context<Self>) {
49        self.init_socket(ctx);
50        //self.init_loop_recv(ctx);
51    }
52
53    fn init_socket(&mut self, ctx: &mut actix::Context<Self>) {
54        if self.socket.is_some() {
55            self.init_loop_recv(ctx);
56            return;
57        }
58        let local_addr_str = if let Some(addr) = self.local_addr_str.as_ref() {
59            addr.to_owned()
60        } else {
61            "0.0.0.0:0".to_owned()
62        };
63        async move { UdpSocket::bind(&local_addr_str).await.unwrap() }
64            .into_actor(self)
65            .map(|r, act, ctx| {
66                act.udp_port = r.local_addr().unwrap().port();
67                act.socket = Some(Arc::new(r));
68                act.init_loop_recv(ctx);
69            })
70            .wait(ctx);
71    }
72
73    fn init_loop_recv(&mut self, ctx: &mut actix::Context<Self>) {
74        let socket = self.socket.as_ref().unwrap().clone();
75        let notify_addr = self.addr.clone();
76        let buf = self.buf.replace(Vec::new());
77        async move {
78            let mut buf = buf.unwrap_or_default();
79            if buf.len() < MAX_DATAGRAM_SIZE {
80                buf = vec![0u8; MAX_DATAGRAM_SIZE];
81            }
82            while let Ok((_len, addr)) = socket.recv_from(&mut buf).await {
83                //let mut data:Vec<u8> = Vec::with_capacity(len);
84                //let mut data: Vec<u8> = vec![0u8; len];
85                //data.clone_from_slice(&buf[..len]);
86                let msg = NamingListenerCmd::Response(addr.to_owned());
87                //let s=String::from_utf8_lossy(&buf[..len]);
88                //println!("rece from:{} | len:{} | str:{}",&addr,len,s);
89                if let Some(_notify_addr) = &notify_addr {
90                    _notify_addr.do_send(msg);
91                }
92            }
93            buf
94        }
95        .into_actor(self)
96        .map(|buf, act, ctx| {
97            act.buf.replace(buf);
98            ctx.run_later(Duration::from_secs(1), |act, ctx| {
99                act.init_loop_recv(ctx);
100            });
101        })
102        .spawn(ctx);
103    }
104}
105
106impl Actor for UdpWorker {
107    type Context = Context<Self>;
108
109    fn started(&mut self, ctx: &mut Self::Context) {
110        log::info!(" UdpWorker started");
111        self.init(ctx);
112    }
113}
114
115#[derive(Debug, Message)]
116#[rtype(result = "Result<(),std::io::Error>")]
117pub struct UdpSenderCmd {
118    pub data: Arc<Vec<u8>>,
119    pub target_addr: SocketAddr,
120}
121
122impl UdpSenderCmd {
123    pub fn new(data: Arc<Vec<u8>>, addr: SocketAddr) -> Self {
124        Self {
125            data,
126            target_addr: addr,
127        }
128    }
129}
130
131impl Handler<UdpSenderCmd> for UdpWorker {
132    type Result = Result<(), std::io::Error>;
133    fn handle(&mut self, msg: UdpSenderCmd, ctx: &mut Context<Self>) -> Self::Result {
134        log::info!("send instance info by udp,to addr:{}", &msg.target_addr);
135        let socket = self.socket.as_ref().unwrap().clone();
136        async move {
137            socket.send_to(&msg.data, msg.target_addr).await;
138        }
139        .into_actor(self)
140        .map(|_, _, _| {})
141        .spawn(ctx);
142        Ok(())
143    }
144}
145
146#[derive(Message)]
147#[rtype(result = "Result<UdpWorkerResult,std::io::Error>")]
148pub enum UdpWorkerCmd {
149    SetListenerAddr(Addr<InnerNamingListener>),
150    Close,
151}
152
153pub enum UdpWorkerResult {
154    None,
155}
156
157impl Handler<UdpWorkerCmd> for UdpWorker {
158    type Result = Result<UdpWorkerResult, std::io::Error>;
159
160    fn handle(&mut self, msg: UdpWorkerCmd, ctx: &mut Self::Context) -> Self::Result {
161        match msg {
162            UdpWorkerCmd::Close => {
163                log::info!("UdpWorker close");
164                self.addr = None;
165                self.socket = None;
166                ctx.stop();
167            }
168            UdpWorkerCmd::SetListenerAddr(addr) => {
169                self.addr = Some(addr);
170            }
171        };
172        Ok(UdpWorkerResult::None)
173    }
174}