1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use lazy_static::lazy_static;
use regex::Regex;
use std::error::Error;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::sync::broadcast::{Receiver, Sender};
lazy_static! {
static ref TAG_PATTERN: Regex = Regex::new(r"^([^:]*): *(.*)$").unwrap();
}
#[derive(Debug, Clone, PartialOrd, PartialEq)]
pub struct Tag {
pub key: String,
pub value: String,
}
pub type Packet = Vec<Tag>;
pub async fn ami_connect<A: ToSocketAddrs>(
server: A,
mut cmd_rx: Receiver<Packet>,
resp_tx: Sender<Vec<Packet>>,
event_tx: Sender<Packet>,
) -> Result<(), Box<dyn Error>> {
let socket = TcpStream::connect(server).await?;
let mut reader = BufReader::new(socket);
let mut greeting = String::new();
reader.read_line(&mut greeting).await?;
let mut response: Vec<Packet> = vec![];
let mut in_packet: Packet = vec![];
let mut line = String::new();
let mut in_response_sequence = false;
loop {
tokio::select! {
bytes_read = reader.read_line(&mut line) => {
if bytes_read? == 0 {
break;
}
let trimmed_line = line.trim();
if trimmed_line.is_empty() {
if !in_response_sequence && !in_packet.is_empty() && in_packet[0].key.eq_ignore_ascii_case("Event") {
event_tx.send(in_packet.clone())?;
} else {
response.push(in_packet.clone());
let event_list = find_tag(&in_packet, "EventList").map(|tag| &tag.value);
if event_list.filter(|el| el.eq_ignore_ascii_case("start")).is_some() {
in_response_sequence = true;
} else if event_list.filter(|el| el.eq_ignore_ascii_case("Complete")).is_some() {
in_response_sequence = false;
}
if !in_response_sequence {
resp_tx.send(response.clone())?;
response.clear();
}
}
in_packet.clear()
} else {
if let Some(tag) = line_to_tag(trimmed_line) {
in_packet.push(tag);
}
}
line.clear();
}
pkt = cmd_rx.recv() => {
let cmd = packet_to_string(&pkt?);
let chunk = format!("{}\r\n\r\n", cmd);
reader.write_all(chunk.as_bytes()).await?;
}
}
}
Ok(())
}
pub fn find_tag<'a>(pkt: &'a Packet, key: &str) -> Option<&'a Tag> {
pkt.iter()
.find(|&tag| tag.key.eq_ignore_ascii_case(key))
}
fn line_to_tag(line: &str) -> Option<Tag> {
let caps = TAG_PATTERN.captures(line)?;
Some(Tag { key: String::from(&caps[1]), value: String::from(&caps[2])})
}
fn packet_to_string(pkt: &Packet) -> String {
pkt.iter()
.map(|Tag { key, value }| format!("{}: {}", key, value))
.collect::<Vec<String>>()
.join("\r\n")
}
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}