shah/
taker.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use std::{
    io::ErrorKind, os::unix::net::UnixDatagram, sync::Mutex, thread::sleep,
    time::Duration,
};

use crate::{error::SystemError, Binary, ErrorCode, OrderHead, Reply};

/// Order Taker
pub struct Taker {
    conn: UnixDatagram,
    // reply: [u8; 1024 * 64],
    server: String,
    count: Mutex<u64>,
}

impl Taker {
    pub fn init(server: &str, path: &str) -> std::io::Result<Self> {
        let _ = std::fs::remove_file(path);
        let conn = UnixDatagram::bind(path)?;
        conn.set_read_timeout(Some(Duration::from_secs(5)))?;
        conn.set_write_timeout(Some(Duration::from_secs(5)))?;
        Ok(Self { conn, server: server.to_string(), count: Mutex::new(0) })
    }

    pub fn connect(&self) -> std::io::Result<()> {
        self.conn.connect(&self.server)?;
        Ok(())
    }

    // pub fn reply_head(&self) -> &ReplyHead {
    //     ReplyHead::from_binary(&self.reply[0..ReplyHead::S])
    // }
    //
    // pub fn reply_body(&self, size: usize) -> &[u8] {
    //     &self.reply[ReplyHead::S..ReplyHead::S + size]
    // }

    pub fn take(&self, order: &mut [u8]) -> Result<Reply, ErrorCode> {
        let mut reply = Reply::default();
        // self.reply[0..ReplyHead::S].fill(0);

        let mut count = self.count.lock().unwrap();
        if *count == u64::MAX {
            *count = 0;
        }
        *count += 1;
        let order_head = OrderHead::from_binary_mut(order);
        order_head.id = *count;

        if let Err(e) = self.conn.send(order) {
            log::error!("send error: {e:#?}");
            match e.kind() {
                ErrorKind::NotConnected | ErrorKind::ConnectionRefused => {
                    for i in 0..3 {
                        log::info!("reconnect try: {i}");
                        if self.connect().is_ok() {
                            self.conn.send(order)?;
                            break;
                        }
                        sleep(Duration::from_secs(2));
                    }
                }
                _ => Err(e)?,
            }
        }
        self.conn.recv(reply.as_binary_mut())?;

        if reply.head.id != *count {
            Err(SystemError::BadOrderId)?;
        }

        // let order_head = OrderHead::from_binary(order);
        // assert_eq!(reply.head.scope, order_head.scope);
        // assert_eq!(reply.head.route, order_head.route);

        if reply.head.error != 0 {
            return Err(ErrorCode::from_u32(reply.head.error));
        }

        // self.conn.recv(&mut reply.body)?;

        // let (reply_head, reply_body) = reply.split_at(ReplyHead::S);
        // let reply_head = self.reply_head();
        // let reply_head = ReplyHead::from_binary(&reply[0..ReplyHead::S]);
        // let reply_body = &reply[ReplyHead::S..ReplyHead::S + size];

        Ok(reply)
    }
}