1use chrono::prelude::*;
2use crate::{Object, Command, VERSION_STRING};
3use crate::patterns::Pattern;
4use crate::server::logger::{Logger, LogMessage};
5use crate::server::storage::Storage;
6use futures::channel::mpsc::{unbounded, UnboundedSender, UnboundedReceiver, TryRecvError};
7use futures::StreamExt;
8use serde_json::{Value, json};
9use std::collections::{HashMap, HashSet};
10use std::iter::FromIterator;
11use std::sync::{Arc, Mutex};
12use thiserror::Error;
13use uuid::Uuid;
14
15pub mod storage;
16pub mod json_rpc;
17pub mod http_transport;
18pub mod tcp_transport;
19pub mod config;
20pub mod logger;
21pub mod admin;
22
23#[derive(Error, Debug, PartialEq)]
24pub enum Error {
25 #[error("invalid object name")]
26 InvalidObjectName,
27 #[error("object not found")]
28 ObjectNotFound,
29 #[error("object values not mergeable")]
30 CantMergeObjects,
31 #[error("query not found")]
32 QueryNotFound,
33 #[error("client not found")]
34 ClientNotFound,
35 #[error("not invocable")]
36 ObjectNotInvocable,
37 #[error("invocation not found")]
38 InvocationNotFound,
39}
40
41fn validate_object_name(name: &str) -> Result<(), Error> {
42 if name == "" || name.starts_with("$") {
43 Err(Error::InvalidObjectName)
44 } else {
45 Ok(())
46 }
47}
48
49fn merge_into_object(old: &mut Value, new: &Value) -> Result<(), Error> {
50 match (old, new) {
51 (&mut Value::Object(ref mut a), &Value::Object(ref b)) => {
52 for (k, v) in b {
53 a.insert(k.to_string(), v.clone());
54 }
55
56 Ok(())
57 },
58 _ => Err(Error::CantMergeObjects),
59 }
60}
61
62#[derive(Debug)]
63pub enum Message {
64 QueryAdd {
65 query_id: Uuid,
66 object: Object,
67 },
68 QueryChange {
69 query_id: Uuid,
70 object: Object,
71 },
72 QueryRemove {
73 query_id: Uuid,
74 object: Object,
75 },
76 QueryEvent {
77 query_id: Uuid,
78 object: String,
79 event: String,
80 data: Value,
81 },
82 QueryInvocation {
83 query_id: Uuid,
84 invocation_id: Uuid,
85 object: String,
86 method: String,
87 args: Value,
88 },
89 InvocationResult {
90 request_id: Value,
91 result: Result<Value, Error>,
92 },
93}
94
95#[derive(Debug, Clone)]
96struct Invocation {
97 id: Uuid,
98 client_id: Uuid,
99 request_id: Value,
100 query_id: Uuid,
101}
102
103#[derive(Debug)]
104struct Query {
105 id: Uuid,
106 pattern: Pattern,
107 provide_rpc: bool,
108 objects: HashSet<String>,
109}
110
111#[derive(Debug)]
112pub struct ClientState {
113 #[allow(dead_code)]
114 id: Uuid,
115 queries: Vec<Query>,
116 invocations: Vec<Invocation>,
117 inbox_tx: UnboundedSender<Message>,
118 disconnect_commands: Vec<Command>,
119}
120
121pub struct Client {
122 id: Uuid,
123 server: Server,
124 inbox_rx: UnboundedReceiver<Message>,
125}
126
127impl Client {
128 pub async fn inbox_next(&mut self) -> Option<Message> {
129 self.inbox_rx.next().await
130 }
131
132 pub fn inbox_try_next(&mut self) -> Result<Option<Message>, TryRecvError> {
133 self.inbox_rx.try_next()
134 }
135}
136
137impl Drop for Client {
138 fn drop(&mut self) {
139 self.server.client_disconnect(self.id);
140 }
141}
142
143#[derive(Clone)]
144pub struct Server {
145 shared: Arc<Shared>,
146}
147
148struct Shared {
149 state: Mutex<State>,
150}
151
152struct State {
153 objects: HashMap<String,Object>,
154 clients: HashMap<Uuid,ClientState>,
155 storage: Option<Box<dyn Storage + Send>>,
156 logger: Box<dyn Logger + Send>,
157}
158
159impl State {
160 fn set(&mut self, name: &str, value: Value, client_id: Uuid) -> Result<(), Error> {
161 let inserted: bool;
162
163 validate_object_name(name)?;
164
165 self.log(LogMessage::Set { object: name.to_string(), value: value.clone(), client: client_id });
166
167 if let Some(object) = self.objects.get_mut(name) {
168 object.value = value;
169 object.last_modified = Utc::now();
170 inserted = false;
171 } else {
172 self.objects.insert(name.to_string(), Object {
173 name: name.to_string(),
174 value,
175 last_modified: Utc::now(),
176 });
177 inserted = true;
178 }
179
180 let object = self.objects[name].clone();
181
182 if let Some(storage) = &self.storage {
183 if inserted {
184 storage.add_object(object.clone());
185 } else {
186 storage.change_object(object.clone());
187 }
188 }
189
190 for client in self.clients.values_mut() {
191 for query in &mut client.queries {
192 if query.pattern.matches_str(name) {
193 let msg = if query.objects.contains(name) {
194 Message::QueryChange {
195 query_id: query.id,
196 object: object.clone(),
197 }
198 } else {
199 query.objects.insert(name.to_string());
200 Message::QueryAdd {
201 query_id: query.id,
202 object: object.clone(),
203 }
204 };
205
206 let _ = client.inbox_tx.unbounded_send(msg);
207 }
208 }
209 }
210
211 Ok(())
212 }
213
214 fn patch(&mut self, name: &str, value: Value, client_id: Uuid) -> Result<(), Error> {
215 let inserted: bool;
216
217 validate_object_name(name)?;
218
219 if !value.is_object() {
220 return Err(Error::CantMergeObjects);
221 }
222
223 self.log(LogMessage::Patch { object: name.to_string(), value: value.clone(), client: client_id });
224
225 if let Some(object) = self.objects.get_mut(name) {
226 merge_into_object(&mut object.value, &value)?;
227 object.last_modified = Utc::now();
228 inserted = false;
229 } else {
230 self.objects.insert(name.to_string(), Object {
231 name: name.to_string(),
232 value,
233 last_modified: Utc::now(),
234 });
235 inserted = true;
236 }
237
238 let object = self.objects[name].clone();
239
240 if let Some(storage) = &self.storage {
241 if inserted {
242 storage.add_object(object.clone());
243 } else {
244 storage.change_object(object.clone());
245 }
246 }
247
248 for client in self.clients.values_mut() {
249 for query in &mut client.queries {
250 if query.pattern.matches_str(name) {
251 let msg = if query.objects.contains(name) {
252 Message::QueryChange {
253 query_id: query.id,
254 object: object.clone(),
255 }
256 } else {
257 query.objects.insert(name.to_string());
258 Message::QueryAdd {
259 query_id: query.id,
260 object: object.clone(),
261 }
262 };
263
264 let _ = client.inbox_tx.unbounded_send(msg);
265 }
266 }
267 }
268
269 Ok(())
270 }
271
272 fn remove(&mut self, name: &str, client_id: Uuid) -> Result<bool, Error> {
273 validate_object_name(name)?;
274
275 if let Some(object) = self.objects.remove(name) {
276 self.log(LogMessage::Remove { object: name.to_string(), client: client_id });
277
278 if let Some(storage) = &self.storage {
279 storage.remove_object(object.clone());
280 }
281
282 for client in self.clients.values_mut() {
283 for query in &mut client.queries {
284 if query.objects.contains(name) {
285 let msg = Message::QueryRemove {
286 query_id: query.id,
287 object: object.clone()
288 };
289 let _ = client.inbox_tx.unbounded_send(msg);
290
291 query.objects.remove(name);
292 }
293 }
294 }
295
296 Ok(true)
297 } else {
298 Ok(false)
299 }
300 }
301
302 fn internal_emit(&mut self, object: &str, event: &str, data: Value) -> Result<(), Error> {
303 if self.objects.get(object).is_none() {
304 return Err(Error::ObjectNotFound)
305 }
306
307 for client in self.clients.values_mut() {
308 for query in &mut client.queries {
309 if query.objects.contains(object) {
310 let msg = Message::QueryEvent {
311 query_id: query.id,
312 object: object.to_string(),
313 event: event.to_string(),
314 data: data.clone(),
315 };
316 let _ = client.inbox_tx.unbounded_send(msg);
317 }
318 }
319 }
320
321 Ok(())
322 }
323
324 fn emit(&mut self, object: &str, event: &str, data: Value, client_id: Uuid) -> Result<(), Error> {
325 validate_object_name(object)?;
326
327 self.log(LogMessage::Emit { object: object.to_string(), event: event.to_string(), data: data.clone(), client: client_id });
328 self.internal_emit(object, event, data)
329 }
330
331 fn invoke(&mut self, object: &str, method: &str, args: Value, request_id: Value, client_id: Uuid) -> Result<(), Error> {
332 validate_object_name(object)?;
333
334 let invocation_id = Uuid::new_v4();
335
336 self.log(LogMessage::Invoke { object: object.to_string(), method: method.to_string(), args: args.clone(), invocation_id: invocation_id.clone(), client: client_id });
337
338 if self.objects.get(object).is_none() {
339 return Err(Error::ObjectNotFound)
340 }
341
342 for responder in self.clients.values_mut() {
343 for query in &mut responder.queries {
344 if query.provide_rpc {
345 if query.objects.contains(object) {
346 responder.invocations.push(Invocation {
347 id: invocation_id,
348 client_id,
349 request_id,
350 query_id: query.id,
351 });
352
353 let msg = Message::QueryInvocation {
354 query_id: query.id,
355 invocation_id,
356 object: object.to_string(),
357 method: method.to_string(),
358 args: args.clone(),
359 };
360 let _ = responder.inbox_tx.unbounded_send(msg);
361
362 return Ok(())
363 }
364 }
365 }
366 }
367
368 Err(Error::ObjectNotInvocable)
369 }
370
371 fn log(&mut self, message: LogMessage) {
372 self.logger.log(&message);
373
374 self.internal_emit("$system", "log", serde_json::to_value(message).unwrap()).unwrap()
375 }
376}
377
378impl Server {
379 pub fn new(storage: Option<Box<dyn Storage + Send>>, logger: Box<dyn Logger + Send>) -> Self {
380 let mut objects = HashMap::new();
381
382 objects.insert("$system".to_string(), Object {
383 name: "$system".to_string(),
384 value: json!({ "version": VERSION_STRING }),
385 last_modified: Utc::now(),
386 });
387
388 if let Some(ref storage) = storage {
389 for object in storage.get_objects() {
390 objects.insert(object.name.clone(), object);
391 }
392 }
393
394 let shared = Arc::new(Shared {
395 state: Mutex::new(State {
396 objects,
397 clients: HashMap::new(),
398 storage,
399 logger,
400 })
401 });
402
403 Server { shared }
404 }
405
406 pub fn client_connect(&self) -> Client {
407 let mut state = self.shared.state.lock().unwrap();
408
409 let id = Uuid::new_v4();
410
411 let (tx, rx) = unbounded();
412
413 let client = ClientState {
414 id,
415 queries: vec![],
416 invocations: vec![],
417 inbox_tx: tx,
418 disconnect_commands: vec![],
419 };
420
421 state.log(LogMessage::ClientConnect { client: id });
422
423 state.clients.insert(id, client);
424
425 Client { id, server: self.clone(), inbox_rx: rx }
426 }
427
428 fn client_disconnect(&self, client_id: Uuid) {
429 let mut state = self.shared.state.lock().unwrap();
430
431 let client = state.clients.remove(&client_id);
432
433 if let Some(client) = client {
434 for invocation in client.invocations {
435 if let Some(client) = state.clients.get_mut(&invocation.client_id) {
436 let msg = Message::InvocationResult {
437 request_id: invocation.request_id,
438 result: Err(Error::ObjectNotInvocable),
439 };
440 let _ = client.inbox_tx.unbounded_send(msg);
441 }
442 }
443
444 for command in client.disconnect_commands {
445 match command {
446 Command::Set { name, value } => {
447 let _ = state.set(&name, value, client.id);
448 },
449 Command::Patch { name, value } => {
450 let _ = state.patch(&name, value, client.id);
451 },
452 Command::Remove { name } => {
453 let _ = state.remove(&name, client.id);
454 },
455 Command::Emit { object, event, data } => {
456 let _ = state.emit(&object, &event, data, client.id);
457 },
458 }
459 }
460 }
461
462 state.log(LogMessage::ClientDisconnect { client: client_id });
463 }
464
465 pub fn set_disconnect_commands(&self, commands: Vec<Command>, client: &Client) -> Result<(), Error> {
466 let mut state = self.shared.state.lock().unwrap();
467
468 if let Some(client) = state.clients.get_mut(&client.id) {
469 client.disconnect_commands = commands;
470 Ok(())
471 } else {
472 Err(Error::ClientNotFound)
473 }
474 }
475
476 pub fn set(&self, name: &str, value: Value, client: &Client) -> Result<(), Error> {
477 let mut state = self.shared.state.lock().unwrap();
478 state.set(name, value, client.id)
479 }
480
481 pub fn patch(&self, name: &str, value: Value, client: &Client) -> Result<(), Error> {
482 let mut state = self.shared.state.lock().unwrap();
483 state.patch(name, value, client.id)
484 }
485
486 pub fn get(&self, pattern: &Pattern, client: &Client) -> Vec<Object> {
487 let mut state = self.shared.state.lock().unwrap();
488
489 state.log(LogMessage::Get { pattern: pattern.string.clone(), client: client.id });
490
491 state.objects.values().filter(|object| {
492 pattern.matches(&object.name)
493 }).cloned().collect()
494 }
495
496 pub fn query(&self, pattern: &Pattern, provide_rpc: bool, client: &Client) -> Result<(Uuid, Vec<Object>),Error> {
497 let mut state = self.shared.state.lock().unwrap();
498
499 let id = Uuid::new_v4();
500
501 state.log(LogMessage::Query { pattern: pattern.string.clone(), provide_rpc, query: id, client: client.id });
502
503 let objects: Vec<Object> = state.objects.values().filter(|object| {
504 pattern.matches(&object.name)
505 }).cloned().collect();
506
507 if let Some(client) = state.clients.get_mut(&client.id) {
508 client.queries.push(Query {
509 id,
510 pattern: pattern.clone(),
511 provide_rpc,
512 objects: HashSet::from_iter(objects.iter().map(|object| object.name.clone())),
513 });
514 Ok((id, objects))
515 } else {
516 Err(Error::ClientNotFound)
517 }
518 }
519
520 pub fn unsubscribe(&self, query_id: Uuid, client: &Client) -> Result<(), Error> {
521 let mut state = self.shared.state.lock().unwrap();
522
523 state.log(LogMessage::Unsubscribe { query: query_id, client: client.id });
524
525 let mut invocations: Vec<Invocation> = vec![];
526 {
527 let client = state.clients.get_mut(&client.id).unwrap();
528
529 if let Some(index) = client.queries.iter().position(|query| query.id == query_id) {
530 client.queries.remove(index);
531
532 client.invocations.retain(|invocation| {
534 if invocation.query_id == query_id {
535 invocations.push(invocation.clone());
536 return false;
537 } else {
538 return true;
539 }
540 });
541 } else {
542 return Err(Error::QueryNotFound)
543 }
544 }
545
546 for invocation in invocations {
547 if let Some(client) = state.clients.get_mut(&invocation.client_id) {
548 let msg = Message::InvocationResult {
549 request_id: invocation.request_id,
550 result: Err(Error::ObjectNotInvocable),
551 };
552 let _ = client.inbox_tx.unbounded_send(msg);
553 }
554 }
555
556 Ok(())
557 }
558
559 pub fn remove(&self, name: &str, client: &Client) -> Result<bool, Error> {
560 let mut state = self.shared.state.lock().unwrap();
561 state.remove(name, client.id)
562 }
563
564 pub fn emit(&self, object: &str, event: &str, data: Value, client: &Client) -> Result<(), Error> {
565 let mut state = self.shared.state.lock().unwrap();
566 state.emit(object, event, data, client.id)
567 }
568
569 pub fn invoke(&self, object: &str, method: &str, args: Value, request_id: Value, client: &Client) -> Result<(), Error> {
570 let mut state = self.shared.state.lock().unwrap();
571 state.invoke(object, method, args, request_id, client.id)
572 }
573
574 pub fn invoke_result(&self, invocation_id: Uuid, result: Value, client: &Client) -> Result<(), Error> {
575 let mut state = self.shared.state.lock().unwrap();
576
577 state.log(LogMessage::InvokeResult { invocation_id, result: result.clone(), client: client.id });
578
579 let invocation: Option<Invocation> = (|| {
580 let client = state.clients.get_mut(&client.id).unwrap();
581
582 if let Some(index) = client.invocations.iter().position(|invocation| invocation.id == invocation_id) {
583 Some(client.invocations.remove(index))
584 } else {
585 None
586 }
587 })();
588
589 if let Some(invocation) = invocation {
590 if let Some(client) = state.clients.get_mut(&invocation.client_id) {
591 let msg = Message::InvocationResult {
592 request_id: invocation.request_id,
593 result: Ok(result),
594 };
595 let _ = client.inbox_tx.unbounded_send(msg);
596
597 Ok(())
598 } else {
599 Ok(())
601 }
602 } else {
603 Err(Error::InvocationNotFound)
604 }
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611 use crate::server::logger::NullLogger;
612 use serde_json::json;
613
614 fn create_server() -> Server {
615 Server::new(None, Box::new(NullLogger))
616 }
617
618 #[test]
619 fn test_set_insert() {
620 let server = create_server();
621 let client = server.client_connect();
622
623 server.set("foo", json!({ "bar": true }), &client).unwrap();
624
625 let state = server.shared.state.lock().unwrap();
626 assert!(state.objects.contains_key("foo"));
627 assert_eq!(state.objects["foo"].name, "foo");
628 assert_eq!(state.objects["foo"].value, json!({ "bar": true }));
629 }
630
631 #[test]
632 fn test_set_update() {
633 let server = create_server();
634 let client = server.client_connect();
635
636 server.set("foo", json!({ "bar": true }), &client).unwrap();
637 server.set("foo", json!({ "bar": false }), &client).unwrap();
638
639 let state = server.shared.state.lock().unwrap();
640 assert_eq!(state.objects["foo"].value, json!({ "bar": false }));
641 }
642
643 #[test]
644 fn test_set_invalid_name() {
645 let server = create_server();
646 let client = server.client_connect();
647
648 let result = server.set("$system", json!({ "bar": true }), &client);
649 assert_eq!(result, Err(Error::InvalidObjectName));
650 }
651
652 #[test]
653 fn test_patch_invalid_name() {
654 let server = create_server();
655 let client = server.client_connect();
656
657 let result = server.patch("$system", json!({ "bar": true }), &client);
658 assert_eq!(result, Err(Error::InvalidObjectName));
659 }
660
661 #[test]
662 fn test_patch_insert() {
663 let server = create_server();
664 let client = server.client_connect();
665
666 server.patch("foo", json!({ "bar": true }), &client).unwrap();
667
668 let state = server.shared.state.lock().unwrap();
669 assert!(state.objects.contains_key("foo"));
670 assert_eq!(state.objects["foo"].name, "foo");
671 assert_eq!(state.objects["foo"].value, json!({ "bar": true }));
672 }
673
674 #[test]
675 fn test_patch_insert_non_object() {
676 let server = create_server();
677 let client = server.client_connect();
678
679 let result = server.patch("foo", json!(42), &client);
680 assert_eq!(result, Err(Error::CantMergeObjects));
681 }
682
683 #[test]
684 fn test_patch_update_non_object() {
685 let server = create_server();
686 let client = server.client_connect();
687
688 server.set("foo", json!(42), &client).unwrap();
689
690 let result = server.patch("foo", json!({ "baz": true }), &client);
691 assert_eq!(result, Err(Error::CantMergeObjects));
692 }
693
694 #[test]
695 fn test_patch_update_with_non_object() {
696 let server = create_server();
697 let client = server.client_connect();
698
699 server.set("foo", json!({ "bar": true }), &client).unwrap();
700
701 let result = server.patch("foo", json!(42), &client);
702 assert_eq!(result, Err(Error::CantMergeObjects));
703 }
704
705 #[test]
706 fn test_patch_update() {
707 let server = create_server();
708 let client = server.client_connect();
709
710 server.set("foo", json!({ "bar": true }), &client).unwrap();
711 server.patch("foo", json!({ "baz": true }), &client).unwrap();
712
713 let state = server.shared.state.lock().unwrap();
714 assert!(state.objects.contains_key("foo"));
715 assert_eq!(state.objects["foo"].name, "foo");
716 assert_eq!(state.objects["foo"].value, json!({ "bar": true, "baz": true }));
717 }
718
719 #[test]
720 fn test_patch_update_non_deep() {
721 let server = create_server();
722 let client = server.client_connect();
723
724 server.set("foo", json!({ "on": true, "color": { "hue": 100, "saturation": 100 } }), &client).unwrap();
725 server.patch("foo", json!({ "color": { "temp": 50 } }), &client).unwrap();
726
727 let state = server.shared.state.lock().unwrap();
728 assert!(state.objects.contains_key("foo"));
729 assert_eq!(state.objects["foo"].name, "foo");
730 assert_eq!(state.objects["foo"].value, json!({ "on": true, "color": { "temp": 50 } }));
731 }
732
733 #[test]
734 fn test_get() {
735 let server = create_server();
736 let client = server.client_connect();
737
738 server.set("livingroom/temperature", json!({ "temp": 20.3 }), &client).unwrap();
739 server.set("livingroom/humidity", json!({ "humid": 40 }), &client).unwrap();
740 server.set("bedroom/temperature", json!({ "temp": 19 }), &client).unwrap();
741
742 let result = server.get(&Pattern::compile("$system").unwrap(), &client);
743 assert_eq!(result.len(), 1);
744
745 let result = server.get(&Pattern::compile("*").unwrap(), &client);
746 assert_eq!(result.len(), 3);
747
748 let result = server.get(&Pattern::compile("*,$system").unwrap(), &client);
749 assert_eq!(result.len(), 4);
750
751 let result = server.get(&Pattern::compile("+/temperature,+/humidity").unwrap(), &client);
752 assert_eq!(result.len(), 3);
753
754 let result = server.get(&Pattern::compile("livingroom/+").unwrap(), &client);
755 assert_eq!(result.len(), 2);
756
757 let result = server.get(&Pattern::compile("+/humidity").unwrap(), &client);
758 assert_eq!(result.len(), 1);
759 }
760
761 #[test]
762 fn test_query() {
763 let server = create_server();
764 let client1 = server.client_connect();
765 let mut client2 = server.client_connect();
766
767 server.set("livingroom/temperature", json!({ "temp": 20.3 }), &client1).unwrap();
768
769 let (query_id, objects) = server.query(&Pattern::compile("+/temperature").unwrap(), false, &client2).unwrap();
770
771 assert_eq!(objects.len(), 1);
772 assert_eq!(objects[0].name, "livingroom/temperature");
773 assert_eq!(objects[0].value, json!({ "temp": 20.3 }));
774
775 server.set("livingroom/temperature", json!({ "temp": 20.4 }), &client1).unwrap();
776 server.set("livingroom/temperature", json!({ "temp": 20.5 }), &client1).unwrap();
777 server.set("bedroom/temperature", json!({ "temp": 19.0 }), &client1).unwrap();
778 server.set("bedroom/temperature", json!({ "temp": 19.1 }), &client1).unwrap();
779
780 let msg = client2.inbox_try_next().unwrap().unwrap();
781
782 if let Message::QueryChange { query_id: msg_query_id, object } = msg {
783 assert_eq!(msg_query_id, query_id);
784 assert_eq!(object.name, "livingroom/temperature");
785 assert_eq!(object.value, json!({ "temp": 20.4 }));
786 } else {
787 assert!(false);
788 }
789
790 let msg = client2.inbox_try_next().unwrap().unwrap();
791
792 if let Message::QueryChange { query_id: msg_query_id, object } = msg {
793 assert_eq!(msg_query_id, query_id);
794 assert_eq!(object.name, "livingroom/temperature");
795 assert_eq!(object.value, json!({ "temp": 20.5 }));
796 } else {
797 assert!(false);
798 }
799
800 let msg = client2.inbox_try_next().unwrap().unwrap();
801
802 if let Message::QueryAdd { query_id: msg_query_id, object } = msg {
803 assert_eq!(msg_query_id, query_id);
804 assert_eq!(object.name, "bedroom/temperature");
805 assert_eq!(object.value, json!({ "temp": 19.0 }));
806 } else {
807 assert!(false);
808 }
809
810 let msg = client2.inbox_try_next().unwrap().unwrap();
811
812 if let Message::QueryChange { query_id: msg_query_id, object } = msg {
813 assert_eq!(msg_query_id, query_id);
814 assert_eq!(object.name, "bedroom/temperature");
815 assert_eq!(object.value, json!({ "temp": 19.1 }));
816 } else {
817 assert!(false);
818 }
819
820 assert!(client2.inbox_try_next().is_err());
821 }
822
823 #[test]
824 fn test_unsubscribe() {
825 let server = create_server();
826 let client1 = server.client_connect();
827 let mut client2 = server.client_connect();
828
829 server.set("livingroom/temperature", json!({ "temp": 20.3 }), &client1).unwrap();
830
831 let (query_id, _) = server.query(&Pattern::compile("+/temperature").unwrap(), false, &client2).unwrap();
832
833 server.set("livingroom/temperature", json!({ "temp": 20.4 }), &client1).unwrap();
834
835 let msg = client2.inbox_try_next().unwrap().unwrap();
836 if let Message::QueryChange { query_id: msg_query_id, object } = msg {
837 assert_eq!(msg_query_id, query_id);
838 assert_eq!(object.name, "livingroom/temperature");
839 assert_eq!(object.value, json!({ "temp": 20.4 }));
840 } else {
841 assert!(false);
842 }
843
844 server.unsubscribe(query_id, &client2).unwrap();
845
846 server.set("livingroom/temperature", json!({ "temp": 20.5 }), &client1).unwrap();
847
848 assert!(client2.inbox_try_next().is_err());
849 }
850
851 #[test]
852 fn test_remove_non_existing() {
853 let server = create_server();
854 let client = server.client_connect();
855
856 let existed = server.remove("foo", &client).unwrap();
857 assert!(!existed);
858 }
859
860 #[test]
861 fn test_remove_existing() {
862 let server = create_server();
863 let client = server.client_connect();
864
865 server.set("foo", json!({ "bar": 1 }), &client).unwrap();
866
867 let existed = server.remove("foo", &client).unwrap();
868 assert!(existed);
869 }
870
871 #[test]
872 fn test_remove_query() {
873 let server = create_server();
874 let client = server.client_connect();
875
876 server.set("foo", json!({ "bar": 1 }), &client).unwrap();
877
878 let mut client = server.client_connect();
879
880 let (query_id, _) = server.query(&Pattern::compile("*").unwrap(), false, &client).unwrap();
881
882 server.remove("foo", &client).unwrap();
883
884 let msg = client.inbox_try_next().unwrap().unwrap();
885
886 if let Message::QueryRemove { query_id: msg_query_id, object } = msg {
887 assert_eq!(msg_query_id, query_id);
888 assert_eq!(object.name, "foo");
889 assert_eq!(object.value, json!({ "bar": 1 }));
890 } else {
891 assert!(false);
892 }
893
894 server.set("foo", json!({ "bar": 1 }), &client).unwrap();
895
896 let msg = client.inbox_try_next().unwrap().unwrap();
897
898 if let Message::QueryAdd { query_id: msg_query_id, object } = msg {
899 assert_eq!(msg_query_id, query_id);
900 assert_eq!(object.name, "foo");
901 assert_eq!(object.value, json!({ "bar": 1 }));
902 } else {
903 assert!(false);
904 }
905
906 assert!(client.inbox_try_next().is_err());
907 }
908
909 #[test]
910 fn test_emit_event() {
911 let server = create_server();
912 let client = server.client_connect();
913
914 server.set("gamepad", json!({ "buttons": ["a", "b"] }), &client).unwrap();
915
916 let mut client = server.client_connect();
917
918 let (query_id, _) = server.query(&Pattern::compile("*").unwrap(), false, &client).unwrap();
919
920 server.emit("gamepad", "buttonpress", json!({ "button": "a" }), &client).unwrap();
921
922 let msg = client.inbox_try_next().unwrap().unwrap();
923
924 if let Message::QueryEvent { query_id: msg_query_id, object, event, data } = msg {
925 assert_eq!(msg_query_id, query_id);
926 assert_eq!(object, "gamepad");
927 assert_eq!(event, "buttonpress");
928 assert_eq!(data, json!({ "button": "a" }));
929 } else {
930 assert!(false);
931 }
932
933 assert!(client.inbox_try_next().is_err());
934 }
935
936 #[test]
937 fn test_emit_event_doesnt_exist() {
938 let server = create_server();
939 let client = server.client_connect();
940
941 let result = server.emit("gamepad", "buttonpress", json!({ "button": "a" }), &client);
942
943 assert_eq!(result, Err(Error::ObjectNotFound));
944 }
945
946 #[test]
947 fn test_invoke_doesnt_exist() {
948 let server = create_server();
949 let client = server.client_connect();
950
951 let result = server.invoke("lamp", "setState", json!({ "on": true }), json!(1), &client);
952
953 assert_eq!(result, Err(Error::ObjectNotFound));
954 }
955
956 #[test]
957 fn test_invoke_not_invokable() {
958 let server = create_server();
959 let client = server.client_connect();
960
961 server.set("lamp", json!({ "on": false }), &client).unwrap();
962
963 let result = server.invoke("lamp", "setState", json!({ "on": true }), json!(1), &client);
964
965 assert_eq!(result, Err(Error::ObjectNotInvocable));
966 }
967
968 #[test]
969 fn test_invoke() {
970 let server = create_server();
971 let mut provider = server.client_connect();
972 let mut consumer = server.client_connect();
973
974 server.set("lamp", json!({ "on": false }), &provider).unwrap();
975 let (query_id, _) = server.query(&Pattern::compile("lamp").unwrap(), true, &provider).unwrap();
976
977 let result = server.invoke("lamp", "setState", json!({ "on": true }), json!(1), &consumer);
978 assert_eq!(result, Ok(()));
979
980 let msg = provider.inbox_try_next().unwrap().unwrap();
981
982 let invocation_id;
983
984 if let Message::QueryInvocation { query_id: msg_query_id, invocation_id: msg_invocation_id, object, method, args } = msg {
985 assert_eq!(msg_query_id, query_id);
986 assert_eq!(object, "lamp");
987 assert_eq!(method, "setState");
988 assert_eq!(args, json!({ "on": true }));
989 invocation_id = msg_invocation_id;
990 } else {
991 assert!(false);
992 return;
993 }
994
995 server.invoke_result(invocation_id, json!({ "success": true }), &provider).unwrap();
996
997 let msg = consumer.inbox_try_next().unwrap().unwrap();
998
999 if let Message::InvocationResult { request_id, result } = msg {
1000 assert_eq!(request_id, json!(1));
1001 assert_eq!(result, Ok(json!({ "success": true })));
1002 } else {
1003 assert!(false);
1004 }
1005 }
1006
1007 #[test]
1008 fn test_invoke_client_disconnect() {
1009 let server = create_server();
1010 let mut provider = server.client_connect();
1011 let mut consumer = server.client_connect();
1012
1013 server.set("lamp", json!({ "on": false }), &provider).unwrap();
1014 let (query_id, _) = server.query(&Pattern::compile("lamp").unwrap(), true, &provider).unwrap();
1015
1016 let result = server.invoke("lamp", "setState", json!({ "on": true }), json!(1), &consumer);
1017 assert_eq!(result, Ok(()));
1018
1019 let msg = provider.inbox_try_next().unwrap().unwrap();
1020
1021 if let Message::QueryInvocation { query_id: msg_query_id, invocation_id: _invocation_id, object, method, args } = msg {
1022 assert_eq!(msg_query_id, query_id);
1023 assert_eq!(object, "lamp");
1024 assert_eq!(method, "setState");
1025 assert_eq!(args, json!({ "on": true }));
1026 } else {
1027 assert!(false);
1028 return;
1029 }
1030
1031 drop(provider);
1033
1034 let msg = consumer.inbox_try_next().unwrap().unwrap();
1035
1036 if let Message::InvocationResult { request_id, result } = msg {
1037 assert_eq!(request_id, json!(1));
1038 assert_eq!(result, Err(Error::ObjectNotInvocable));
1039 } else {
1040 assert!(false);
1041 }
1042 }
1043
1044 #[test]
1045 fn test_invoke_unsubscribe() {
1046 let server = create_server();
1047 let mut provider = server.client_connect();
1048 let mut consumer = server.client_connect();
1049
1050 server.set("lamp", json!({ "on": false }), &provider).unwrap();
1051 let (query_id, _) = server.query(&Pattern::compile("lamp").unwrap(), true, &provider).unwrap();
1052
1053 let result = server.invoke("lamp", "setState", json!({ "on": true }), json!(1), &consumer);
1054 assert_eq!(result, Ok(()));
1055
1056 let msg = provider.inbox_try_next().unwrap().unwrap();
1057
1058 if let Message::QueryInvocation { query_id: msg_query_id, invocation_id: _invocation_id, object, method, args } = msg {
1059 assert_eq!(msg_query_id, query_id);
1060 assert_eq!(object, "lamp");
1061 assert_eq!(method, "setState");
1062 assert_eq!(args, json!({ "on": true }));
1063 } else {
1064 assert!(false);
1065 return;
1066 }
1067
1068 server.unsubscribe(query_id, &provider).unwrap();
1070
1071 let msg = consumer.inbox_try_next().unwrap().unwrap();
1072
1073 if let Message::InvocationResult { request_id, result } = msg {
1074 assert_eq!(request_id, json!(1));
1075 assert_eq!(result, Err(Error::ObjectNotInvocable));
1076 } else {
1077 assert!(false);
1078 }
1079 }
1080
1081 #[test]
1082 fn test_disconnect_command_set() {
1083 let server = create_server();
1084 let mut observer = server.client_connect();
1085 let device = server.client_connect();
1086
1087 server.set("lamp", json!({ "online": true }), &device).unwrap();
1088 server.set_disconnect_commands(vec![
1089 Command::Set {
1090 name: "lamp".to_string(),
1091 value: json!({ "online": false }),
1092 }
1093 ], &device).unwrap();
1094
1095 let (query_id, _) = server.query(&Pattern::compile("lamp").unwrap(), true, &observer).unwrap();
1096
1097 drop(device);
1098
1099 let msg = observer.inbox_try_next().unwrap().unwrap();
1100
1101 if let Message::QueryChange { query_id: msg_query_id, object } = msg {
1102 assert_eq!(msg_query_id, query_id);
1103 assert_eq!(object.name, "lamp");
1104 assert_eq!(object.value, json!({ "online": false }));
1105 } else {
1106 assert!(false);
1107 }
1108
1109 assert!(observer.inbox_try_next().is_err());
1110 }
1111
1112 #[test]
1113 fn test_disconnect_command_patch() {
1114 let server = create_server();
1115 let mut observer = server.client_connect();
1116 let device = server.client_connect();
1117
1118 server.patch("lamp", json!({ "online": true }), &device).unwrap();
1119 server.set_disconnect_commands(vec![
1120 Command::Patch {
1121 name: "lamp".to_string(),
1122 value: json!({ "online": false }),
1123 }
1124 ], &device).unwrap();
1125
1126 let (query_id, _) = server.query(&Pattern::compile("lamp").unwrap(), true, &observer).unwrap();
1127
1128 drop(device);
1129
1130 let msg = observer.inbox_try_next().unwrap().unwrap();
1131
1132 if let Message::QueryChange { query_id: msg_query_id, object } = msg {
1133 assert_eq!(msg_query_id, query_id);
1134 assert_eq!(object.name, "lamp");
1135 assert_eq!(object.value, json!({ "online": false }));
1136 } else {
1137 assert!(false);
1138 }
1139
1140 assert!(observer.inbox_try_next().is_err());
1141 }
1142
1143 #[test]
1144 fn test_disconnect_command_remove() {
1145 let server = create_server();
1146 let mut observer = server.client_connect();
1147 let device = server.client_connect();
1148
1149 server.patch("client", json!({ "online": true }), &device).unwrap();
1150 server.set_disconnect_commands(vec![
1151 Command::Remove {
1152 name: "client".to_string(),
1153 }
1154 ], &device).unwrap();
1155
1156 let (query_id, _) = server.query(&Pattern::compile("client").unwrap(), true, &observer).unwrap();
1157
1158 drop(device);
1159
1160 let msg = observer.inbox_try_next().unwrap().unwrap();
1161
1162 if let Message::QueryRemove { query_id: msg_query_id, object } = msg {
1163 assert_eq!(msg_query_id, query_id);
1164 assert_eq!(object.name, "client");
1165 } else {
1166 assert!(false);
1167 }
1168
1169 assert!(observer.inbox_try_next().is_err());
1170 }
1171
1172 #[test]
1173 fn test_disconnect_command_emit() {
1174 let server = create_server();
1175 let mut observer = server.client_connect();
1176 let device = server.client_connect();
1177
1178 server.set("lamp", json!({ "on": false }), &device).unwrap();
1179 server.set_disconnect_commands(vec![
1180 Command::Emit {
1181 object: "lamp".to_string(),
1182 event: "offline".to_string(),
1183 data: json!({}),
1184 }
1185 ], &device).unwrap();
1186
1187 let (query_id, _) = server.query(&Pattern::compile("lamp").unwrap(), true, &observer).unwrap();
1188
1189 drop(device);
1190
1191 let msg = observer.inbox_try_next().unwrap().unwrap();
1192
1193 if let Message::QueryEvent { query_id: msg_query_id, object, event, data } = msg {
1194 assert_eq!(msg_query_id, query_id);
1195 assert_eq!(object, "lamp");
1196 assert_eq!(event, "offline");
1197 assert_eq!(data, json!({}));
1198 } else {
1199 assert!(false);
1200 }
1201
1202 assert!(observer.inbox_try_next().is_err());
1203 }
1204}