1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use std::thread;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex};
use websocket::{Message, OwnedMessage};
use websocket::client::ClientBuilder;
use std::sync::mpsc::Sender;
use chan::Channel;
use serde_json;
use message::{Message as PhoenixMessage};
pub struct Phoenix {
tx: Sender<OwnedMessage>,
count: u8,
channels: Arc<Mutex<Vec<Arc<Mutex<Channel>>>>>,
pub out: Receiver<PhoenixMessage>,
}
impl Phoenix {
pub fn new(url: &str) -> Phoenix {
let client = ClientBuilder::new(&format!("{}/websocket", url))
.unwrap()
.connect_insecure()
.unwrap();
let (mut receiver, mut sender) = client.split().unwrap();
let (tx, rx) = channel();
let tx_1 = tx.clone();
thread::spawn(move || {
loop {
let message = match rx.recv() {
Ok(m) => {
m
},
Err(e) => {
println!("Send Loop: {:?}", e);
return;
}
};
match message {
OwnedMessage::Close(_) => {
let _ = sender.send_message(&message);
return;
}
_ => (),
}
match sender.send_message(&message) {
Ok(()) => (),
Err(e) => {
let _ = sender.send_message(&Message::close());
return;
}
}
}
});
let channels: Arc<Mutex<Vec<Arc<Mutex<Channel>>>>> = Arc::new(Mutex::new(vec![]));
let channels_1 = channels.clone();
let (send, recv) = channel();
thread::spawn(move || {
for message in receiver.incoming_messages() {
let message = match message {
Ok(m) => m,
Err(e) => {
println!("Receive Loop: {:?}", e);
let _ = tx_1.send(OwnedMessage::Close(None));
return;
}
};
match message {
OwnedMessage::Close(_) => {
let _ = tx_1.send(OwnedMessage::Close(None));
return;
}
OwnedMessage::Ping(data) => {
match tx_1.send(OwnedMessage::Pong(data)) {
Ok(()) => (),
Err(e) => {
println!("Receive Loop: {:?}", e);
return;
}
}
}
OwnedMessage::Text(data) => {
let v: PhoenixMessage = serde_json::from_str(&data).unwrap();
send.send(v);
},
_ => println!("Receive Loop: {:?}", message)
}
}
});
Phoenix{
tx: tx.clone(),
count: 0,
channels: channels.clone(),
out: recv,
}
}
pub fn channel(&mut self, topic: &str) -> Arc<Mutex<Channel>> {
self.count = self.count+1;
let chan = Arc::new(Mutex::new(Channel::new(topic, self.tx.clone(), &format!("{}", self.count))));
let mut channels = self.channels.lock().unwrap();
channels.push(chan.clone());
chan
}
}