1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
extern crate websocket;

use broadcaster;
use control_nexus::{ControlInfo, ControlNexus, ControlUpdateProcessor};
use controls;
use json;
use serde_json::Value;
use std::sync::{Arc, Mutex};
use std::thread;
use websocket::message::Message;
use websocket::sync::{Client, Server};
use websocket::OwnedMessage;

pub fn start<'a>(
  guistring: &str,
  cup: Box<dyn ControlUpdateProcessor>,
  ip: &str,
  websockets_port: &str,
  block: bool,
) -> Result<ControlNexus, Box<dyn std::error::Error>> {
  let guival = serde_json::from_str(guistring)?;

  let root = try!(json::deserialize_root(&guival));

  println!(
    "starting websocket server for control page: {} ",
    root.title
  );

  // from control tree, make a map of ids->controls.
  let mapp = controls::make_control_map(&*root.root_control);
  let cnm = controls::control_map_to_name_map(&mapp);

  let ci = ControlInfo {
    cm: mapp,
    cnm: cnm,
    guijson: String::new() + guistring,
  };

  let cmshare = Arc::new(Mutex::new(ci));
  let bc = broadcaster::Broadcaster::new();

  let cn_ret = ControlNexus {
    ci: cmshare,
    bc: bc,
  };
  let cn_ws = cn_ret.clone();

  let mut websockets_ip = String::from(ip);
  websockets_ip.push_str(":");
  websockets_ip.push_str(&websockets_port);

  if block {
    match websockets_main(websockets_ip, cn_ws, Arc::new(Mutex::new(cup))) {
      Ok(_) => (),
      Err(e) => println!("error in websockets_main: {:?}", e),
    }
  } else {
    // Spawn a thread for the websockets handler.
    thread::spawn(
      move || match websockets_main(websockets_ip, cn_ws, Arc::new(Mutex::new(cup))) {
        Ok(_) => (),
        Err(e) => println!("error in websockets_main: {:?}", e),
      },
    );
  }

  Ok(cn_ret)
}

fn sendcontrols(
  ci: &Arc<Mutex<ControlInfo>>,
  client: &mut Client<std::net::TcpStream>,
) -> Result<(), Box<dyn std::error::Error>> {
  let sci = ci.lock().unwrap();

  let updarray = controls::cm_to_update_array(&sci.cm);

  // build json message containing both guijson and the updarray.
  let mut updvals = Vec::new();

  for upd in updarray {
    let um = json::encode_update_message(&upd);
    updvals.push(um);
  }

  let mut guival: Value = serde_json::from_str(&sci.guijson[..])?;

  match guival.as_object_mut() {
    Some(obj) => {
      obj.insert("state".to_string(), Value::Array(updvals));
      ()
    }
    None => (),
  }

  let guistring = serde_json::ser::to_string(&guival)?;
  let message = Message::text(guistring);
  client.send_message(&message)?;
  Ok(())
}

fn websockets_main(
  ipaddr: String,
  acn: ControlNexus,
  cup: Arc<Mutex<Box<dyn ControlUpdateProcessor>>>,
) -> Result<(), Box<dyn std::error::Error>> {
  println!("websockets address {:?}", ipaddr);
  let server = Server::bind(&ipaddr[..])?;
  for request in server.filter_map(Result::ok) {
    let mut scn = acn.clone();
    let cup = cup.clone();
    // Spawn a new thread for each connection.
    thread::spawn(move || {
      if !request.protocols().contains(&"rust-websocket".to_string()) {
        request.reject().unwrap();
        return;
      }

      let mut client: Client<std::net::TcpStream> =
        request.use_protocol("rust-websocket").accept().unwrap();

      let ip = client.peer_addr().unwrap();
      println!("Connection from {}", ip);

      // send up controls.
      match sendcontrols(&scn.ci, &mut client) {
        Err(e) => println!("error sending controls to client: {}", e),
        Ok(_) => (),
      }

      //(websocket::receiver::Reader<std::net::TcpStream>, websocket::sender::Writer<std::net::TcpStream> )
      let (mut receiver, sender) = client.split().unwrap();

      let sendmeh = Arc::new(Mutex::new(sender));
      scn.bc.register(sendmeh.clone());

      for message in receiver.incoming_messages() {
        let message = message.unwrap();

        match message {
          OwnedMessage::Close(_) => {
            let message = OwnedMessage::Close(None);
            let mut s = sendmeh.lock().unwrap();
            s.send_message(&message).unwrap();
            println!("Client {} disconnected", ip);
            return;
          }
          OwnedMessage::Ping(ping) => {
            let message = OwnedMessage::Pong(ping);
            let mut s = sendmeh.lock().unwrap();
            s.send_message(&message).unwrap();
          }
          OwnedMessage::Text(txt) => match serde_json::from_str(txt.as_str()) {
            Err(e) => println!("error {:?}", e),
            Ok(jsonval) => {
              let s_um = json::decode_update_message(&jsonval);
              match s_um {
                Some(updmsg) => {
                  {
                    let mut sci = scn.ci.lock().unwrap();
                    {
                      let mbcntrl = sci.cm.get_mut(controls::get_um_id(&updmsg));
                      match mbcntrl {
                        Some(cntrl) => {
                          (*cntrl).update(&updmsg);
                          scn.bc.broadcast_others(&ip, Message::text(txt.clone()));
                          ()
                        }
                        None => println!("none"),
                      }
                    }
                  }

                  let mut scup = cup.lock().unwrap();
                  scup.on_update_received(&updmsg, &mut scn);
                }
                _ => println!("decode_update_message failed on websockets msg: {:?}", txt),
              }
            }
          },
          _ => {
            println!("unrecognized message type: {:?}", message);
          }
        }
      }
    });
  }

  Ok(())
}