1use std::{
2 io::ErrorKind, os::unix::net::UnixDatagram, sync::Mutex, thread::sleep,
3 time::Duration,
4};
5
6use crate::models::{Binary, OrderHead, Reply};
7use crate::{ErrorCode, SystemError};
8
9pub struct Taker {
11 conn: UnixDatagram,
12 server: String,
14 count: Mutex<u64>,
15 elapsed: Mutex<u64>,
16}
17
18impl Taker {
19 pub fn init(server: &str, path: &str) -> std::io::Result<Self> {
20 let _ = std::fs::remove_file(path);
21 let conn = UnixDatagram::bind(path)?;
22 conn.set_read_timeout(Some(Duration::from_secs(5)))?;
23 conn.set_write_timeout(Some(Duration::from_secs(5)))?;
24 Ok(Self {
25 conn,
26 server: server.to_string(),
27 count: Mutex::new(0),
28 elapsed: Mutex::new(0),
29 })
30 }
31
32 pub fn connect(&self) -> std::io::Result<()> {
33 self.conn.connect(&self.server)?;
34 Ok(())
35 }
36
37 pub fn clear_elapsed(&self) {
38 let mut elapsed = self.elapsed.lock().unwrap();
39 *elapsed = 0;
40 }
41
42 pub fn elapsed(&self) -> u64 {
43 *self.elapsed.lock().unwrap()
44 }
45
46 pub fn count(&self) -> u64 {
47 *self.count.lock().unwrap()
48 }
49
50 pub fn take(&self, order: &mut [u8]) -> Result<Reply, ErrorCode> {
59 let mut reply = Reply::default();
60 let mut count = self.count.lock().unwrap();
63 let mut elapsed = self.elapsed.lock().unwrap();
64
65 *count = count.wrapping_add(1);
66 let order_head = OrderHead::from_binary_mut(&mut order[..OrderHead::S]);
67 order_head.id = *count;
68
69 if let Err(e) = self.conn.send(order) {
70 log::error!("send error: {e:#?}");
71 match e.kind() {
72 ErrorKind::NotConnected | ErrorKind::ConnectionRefused => {
73 let mut did_send = false;
74 for i in 0..3 {
75 log::info!("reconnect try: {i}");
76 if self.connect().is_ok() {
77 self.conn.send(order)?;
78 did_send = true;
79 break;
80 }
81 sleep(Duration::from_secs(2));
82 }
83 if !did_send {
84 return Err(SystemError::SendTimeOut)?;
85 }
86 }
87 _ => return Err(e)?,
88 }
89 }
90 self.conn.recv(reply.as_binary_mut())?;
91 *elapsed = elapsed.wrapping_add(reply.head.elapsed);
92
93 if reply.head.id != *count {
94 return Err(SystemError::BadOrderId)?;
95 }
96
97 if reply.head.error != 0 {
102 return Err(ErrorCode::from_u32(reply.head.error));
103 }
104
105 Ok(reply)
113 }
114}