1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//! Reads and processes messages from the TCP socket
use std::io::Read;
use std::net::Shutdown;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Sender;
use std::sync::Arc;
use log::*;
use super::streamer::Streamer;
use crate::core::errors::IBKRApiLibError;
use crate::core::messages::read_msg;
//==================================================================================================
pub struct Reader {
stream: Box<dyn Streamer + 'static>,
messages: Sender<String>,
disconnect_requested: Arc<AtomicBool>,
is_connected: bool,
}
impl Reader {
pub fn new(
stream: Box<impl Streamer + 'static>,
messages: Sender<String>,
disconnect_requested: Arc<AtomicBool>,
) -> Self {
Reader {
stream,
messages,
disconnect_requested,
is_connected: true,
}
}
//----------------------------------------------------------------------------------------------
pub fn recv_packet(&mut self) -> Result<Vec<u8>, IBKRApiLibError> {
//debug!("_recv_all_msg");
let buf = self._recv_all_msg()?;
// receiving 0 bytes outside a timeout means the connection is either
// closed or broken
if buf.len() == 0 {
if !self.disconnect_requested.load(Ordering::Acquire) {
info!("socket either closed or broken, disconnecting");
self.stream.shutdown(Shutdown::Both)?;
self.is_connected = false;
}
}
Ok(buf)
}
//----------------------------------------------------------------------------------------------
fn _recv_all_msg(&mut self) -> Result<Vec<u8>, IBKRApiLibError> {
let mut cont = true;
let mut allbuf: Vec<u8> = Vec::new();
const NUM_BYTES: usize = 4096;
while cont {
let mut buf: [u8; NUM_BYTES] = [0; NUM_BYTES];
let bytes_read = self
.stream
.read(&mut buf)
.expect("Couldnt read from reader...");
allbuf.extend_from_slice(&buf[0..bytes_read]);
//logger.debug("len %d raw:%s|", len(buf), buf)
if bytes_read < NUM_BYTES {
cont = false;
}
}
Ok(allbuf)
}
//----------------------------------------------------------------------------------------------
fn process_reader_msgs(&mut self) -> Result<(), IBKRApiLibError> {
// grab a packet of messages from the socket
let mut message_packet = self.recv_packet()?;
//debug!(" recvd size {}", message_packet.len());
// Read messages from the packet until there are no more.
// When this loop ends, break into the outer loop and grab another packet.
// Repeat until the connection is closed
//
let _msg = String::new();
while message_packet.len() > 0 {
// Read a message from the packet then add it to the message queue below.
let (_size, msg, remaining_messages) = read_msg(message_packet.as_slice())?;
// clear the Vec that holds the bytes from the packet
// and reload with the bytes that haven't been read.
// The variable remaining_messages only holds the unread messagesleft in the packet
message_packet.clear();
message_packet.extend_from_slice(remaining_messages.as_slice());
if msg.as_str() != "" {
self.messages.send(msg).expect("READER CANNOT SEND MESSAGE");
} else {
//Break to the outer loop in run and get another packet of messages.
debug!("more incoming packet(s) are needed ");
break;
}
}
Ok(())
}
//----------------------------------------------------------------------------------------------
pub fn run(&mut self) {
debug!("starting reader loop");
loop {
if self.disconnect_requested.load(Ordering::Acquire) || !self.is_connected {
return;
}
let result = self.process_reader_msgs();
if !result.is_err() {
continue;
}
error!("{:?}", result);
}
}
}