Skip to main content

shah/
taker.rs

1use std::{
2    io::ErrorKind, os::unix::net::UnixDatagram, sync::Mutex, thread::sleep,
3    time::Duration,
4};
5
6use crate::models::{Binary, OrderHead, Reply};
7use crate::{ErrorCode, SystemError};
8
9/// Order Taker
10pub struct Taker {
11    conn: UnixDatagram,
12    // reply: [u8; 1024 * 64],
13    server: String,
14    count: Mutex<u64>,
15    elapsed: Mutex<u64>,
16}
17
18impl Taker {
19    pub fn init(server: &str, path: &str) -> std::io::Result<Self> {
20        let _ = std::fs::remove_file(path);
21        let conn = UnixDatagram::bind(path)?;
22        conn.set_read_timeout(Some(Duration::from_secs(5)))?;
23        conn.set_write_timeout(Some(Duration::from_secs(5)))?;
24        Ok(Self {
25            conn,
26            server: server.to_string(),
27            count: Mutex::new(0),
28            elapsed: Mutex::new(0),
29        })
30    }
31
32    pub fn connect(&self) -> std::io::Result<()> {
33        self.conn.connect(&self.server)?;
34        Ok(())
35    }
36
37    pub fn clear_elapsed(&self) {
38        let mut elapsed = self.elapsed.lock().unwrap();
39        *elapsed = 0;
40    }
41
42    pub fn elapsed(&self) -> u64 {
43        *self.elapsed.lock().unwrap()
44    }
45
46    pub fn count(&self) -> u64 {
47        *self.count.lock().unwrap()
48    }
49
50    // pub fn reply_head(&self) -> &ReplyHead {
51    //     ReplyHead::from_binary(&self.reply[0..ReplyHead::S])
52    // }
53    //
54    // pub fn reply_body(&self, size: usize) -> &[u8] {
55    //     &self.reply[ReplyHead::S..ReplyHead::S + size]
56    // }
57
58    pub fn take(&self, order: &mut [u8]) -> Result<Reply, ErrorCode> {
59        let mut reply = Reply::default();
60        // self.reply[0..ReplyHead::S].fill(0);
61
62        let mut count = self.count.lock().unwrap();
63        let mut elapsed = self.elapsed.lock().unwrap();
64
65        *count = count.wrapping_add(1);
66        let order_head = OrderHead::from_binary_mut(&mut order[..OrderHead::S]);
67        order_head.id = *count;
68
69        if let Err(e) = self.conn.send(order) {
70            log::error!("send error: {e:#?}");
71            match e.kind() {
72                ErrorKind::NotConnected | ErrorKind::ConnectionRefused => {
73                    let mut did_send = false;
74                    for i in 0..3 {
75                        log::info!("reconnect try: {i}");
76                        if self.connect().is_ok() {
77                            self.conn.send(order)?;
78                            did_send = true;
79                            break;
80                        }
81                        sleep(Duration::from_secs(2));
82                    }
83                    if !did_send {
84                        return Err(SystemError::SendTimeOut)?;
85                    }
86                }
87                _ => return Err(e)?,
88            }
89        }
90        self.conn.recv(reply.as_binary_mut())?;
91        *elapsed = elapsed.wrapping_add(reply.head.elapsed);
92
93        if reply.head.id != *count {
94            return Err(SystemError::BadOrderId)?;
95        }
96
97        // let order_head = OrderHead::from_binary(order);
98        // assert_eq!(reply.head.scope, order_head.scope);
99        // assert_eq!(reply.head.route, order_head.route);
100
101        if reply.head.error != 0 {
102            return Err(ErrorCode::from_u32(reply.head.error));
103        }
104
105        // self.conn.recv(&mut reply.body)?;
106
107        // let (reply_head, reply_body) = reply.split_at(ReplyHead::S);
108        // let reply_head = self.reply_head();
109        // let reply_head = ReplyHead::from_binary(&reply[0..ReplyHead::S]);
110        // let reply_body = &reply[ReplyHead::S..ReplyHead::S + size];
111
112        Ok(reply)
113    }
114}