1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use std::thread;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex};
use websocket::{Message, OwnedMessage};
use websocket::client::ClientBuilder;
use std::sync::mpsc::Sender;
use chan::Channel;
use serde_json;
use message::{Message as PhoenixMessage};

pub struct Phoenix {
	tx: Sender<OwnedMessage>,
	count: u8,
	channels: Arc<Mutex<Vec<Arc<Mutex<Channel>>>>>,
	pub out: Receiver<PhoenixMessage>,
}

impl Phoenix {
	pub fn new(url: &str) -> Phoenix {
		let client = ClientBuilder::new(&format!("{}/websocket", url))
			.unwrap()
			.connect_insecure()
			.unwrap();

		let (mut receiver, mut sender) = client.split().unwrap();

		let (tx, rx) = channel();

		let tx_1 = tx.clone();

		thread::spawn(move || {
			loop {
				// Send loop
				let message = match rx.recv() {
					Ok(m) => {
						//println!("Send Loop: {:?}", m);
						m
					},
					Err(e) => {
						println!("Send Loop: {:?}", e);
						return;
					}
				};
				match message {
					OwnedMessage::Close(_) => {
						let _ = sender.send_message(&message);
						// If it's a close message, just send it and then return.
						return;
					}
					_ => (),
				}
				// Send the message
				match sender.send_message(&message) {
					Ok(()) => (),
					Err(e) => {
						//println!("Send Loop: {:?}", e);
						let _ = sender.send_message(&Message::close());
						return;
					}
				}
			}
		});

		let channels: Arc<Mutex<Vec<Arc<Mutex<Channel>>>>> = Arc::new(Mutex::new(vec![]));
		let channels_1 = channels.clone();

		let (send, recv) = channel();

		thread::spawn(move || {
			// Receive loop
			for message in receiver.incoming_messages() {
				let message = match message {
					Ok(m) => m,
					Err(e) => {
						println!("Receive Loop: {:?}", e);
						let _ = tx_1.send(OwnedMessage::Close(None));
						return;
					}
				};

				match message {
					OwnedMessage::Close(_) => {
						// Got a close message, so send a close message and return
						let _ = tx_1.send(OwnedMessage::Close(None));
						return;
					}
					
					OwnedMessage::Ping(data) => {
						match tx_1.send(OwnedMessage::Pong(data)) {
							// Send a pong in response
							Ok(()) => (),
							Err(e) => {
								println!("Receive Loop: {:?}", e);
								return;
							}
						}
					}

					// Say what we received
					OwnedMessage::Text(data) => {
						//println!("Receive Loop: {:?}", data);
						let v: PhoenixMessage = serde_json::from_str(&data).unwrap();
						send.send(v);
					},
					
					_ => println!("Receive Loop: {:?}", message)
				}
			}
		});


		Phoenix{
			tx: tx.clone(),
			count: 0,
			channels: channels.clone(),
			out: recv,
		}
	}

	pub fn channel(&mut self, topic: &str) -> Arc<Mutex<Channel>> {
		self.count = self.count+1;
		let chan = Arc::new(Mutex::new(Channel::new(topic, self.tx.clone(), &format!("{}", self.count))));
		let mut channels = self.channels.lock().unwrap();
		channels.push(chan.clone());
		chan
	}
}