objtalk/server/
mod.rs

1use chrono::prelude::*;
2use crate::{Object, Command, VERSION_STRING};
3use crate::patterns::Pattern;
4use crate::server::logger::{Logger, LogMessage};
5use crate::server::storage::Storage;
6use futures::channel::mpsc::{unbounded, UnboundedSender, UnboundedReceiver, TryRecvError};
7use futures::StreamExt;
8use serde_json::{Value, json};
9use std::collections::{HashMap, HashSet};
10use std::iter::FromIterator;
11use std::sync::{Arc, Mutex};
12use thiserror::Error;
13use uuid::Uuid;
14
15pub mod storage;
16pub mod json_rpc;
17pub mod http_transport;
18pub mod tcp_transport;
19pub mod config;
20pub mod logger;
21pub mod admin;
22
23#[derive(Error, Debug, PartialEq)]
24pub enum Error {
25	#[error("invalid object name")]
26	InvalidObjectName,
27	#[error("object not found")]
28	ObjectNotFound,
29	#[error("object values not mergeable")]
30	CantMergeObjects,
31	#[error("query not found")]
32	QueryNotFound,
33	#[error("client not found")]
34	ClientNotFound,
35	#[error("not invocable")]
36	ObjectNotInvocable,
37	#[error("invocation not found")]
38	InvocationNotFound,
39}
40
41fn validate_object_name(name: &str) -> Result<(), Error> {
42	if name == "" || name.starts_with("$") {
43		Err(Error::InvalidObjectName)
44	} else {
45		Ok(())
46	}
47}
48
49fn merge_into_object(old: &mut Value, new: &Value) -> Result<(), Error> {
50	match (old, new) {
51		(&mut Value::Object(ref mut a), &Value::Object(ref b)) => {
52			for (k, v) in b {
53				a.insert(k.to_string(), v.clone());
54			}
55			
56			Ok(())
57		},
58		_ => Err(Error::CantMergeObjects),
59	}
60}
61
62#[derive(Debug)]
63pub enum Message {
64	QueryAdd {
65		query_id: Uuid,
66		object: Object,
67	},
68	QueryChange {
69		query_id: Uuid,
70		object: Object,
71	},
72	QueryRemove {
73		query_id: Uuid,
74		object: Object,
75	},
76	QueryEvent {
77		query_id: Uuid,
78		object: String,
79		event: String,
80		data: Value,
81	},
82	QueryInvocation {
83		query_id: Uuid,
84		invocation_id: Uuid,
85		object: String,
86		method: String,
87		args: Value,
88	},
89	InvocationResult {
90		request_id: Value,
91		result: Result<Value, Error>,
92	},
93}
94
95#[derive(Debug, Clone)]
96struct Invocation {
97	id: Uuid,
98	client_id: Uuid,
99	request_id: Value,
100	query_id: Uuid,
101}
102
103#[derive(Debug)]
104struct Query {
105	id: Uuid,
106	pattern: Pattern,
107	provide_rpc: bool,
108	objects: HashSet<String>,
109}
110
111#[derive(Debug)]
112pub struct ClientState {
113	#[allow(dead_code)]
114	id: Uuid,
115	queries: Vec<Query>,
116	invocations: Vec<Invocation>,
117	inbox_tx: UnboundedSender<Message>,
118	disconnect_commands: Vec<Command>,
119}
120
121pub struct Client {
122	id: Uuid,
123	server: Server,
124	inbox_rx: UnboundedReceiver<Message>,
125}
126
127impl Client {
128	pub async fn inbox_next(&mut self) -> Option<Message> {
129		self.inbox_rx.next().await
130	}
131	
132	pub fn inbox_try_next(&mut self) -> Result<Option<Message>, TryRecvError> {
133		self.inbox_rx.try_next()
134	}
135}
136
137impl Drop for Client {
138	fn drop(&mut self) {
139		self.server.client_disconnect(self.id);
140	}
141}
142
143#[derive(Clone)]
144pub struct Server {
145	shared: Arc<Shared>,
146}
147
148struct Shared {
149	state: Mutex<State>,
150}
151
152struct State {
153	objects: HashMap<String,Object>,
154	clients: HashMap<Uuid,ClientState>,
155	storage: Option<Box<dyn Storage + Send>>,
156	logger: Box<dyn Logger + Send>,
157}
158
159impl State {
160	fn set(&mut self, name: &str, value: Value, client_id: Uuid) -> Result<(), Error> {
161		let inserted: bool;
162		
163		validate_object_name(name)?;
164		
165		self.log(LogMessage::Set { object: name.to_string(), value: value.clone(), client: client_id });
166		
167		if let Some(object) = self.objects.get_mut(name) {
168			object.value = value;
169			object.last_modified = Utc::now();
170			inserted = false;
171		} else {
172			self.objects.insert(name.to_string(), Object {
173				name: name.to_string(),
174				value,
175				last_modified: Utc::now(),
176			});
177			inserted = true;
178		}
179		
180		let object = self.objects[name].clone();
181		
182		if let Some(storage) = &self.storage {
183			if inserted {
184				storage.add_object(object.clone());
185			} else {
186				storage.change_object(object.clone());
187			}
188		}
189		
190		for client in self.clients.values_mut() {
191			for query in &mut client.queries {
192				if query.pattern.matches_str(name) {
193					let msg = if query.objects.contains(name) {
194						Message::QueryChange {
195							query_id: query.id,
196							object: object.clone(),
197						}
198					} else {
199						query.objects.insert(name.to_string());
200						Message::QueryAdd {
201							query_id: query.id,
202							object: object.clone(),
203						}
204					};
205					
206					let _ = client.inbox_tx.unbounded_send(msg);
207				}
208			}
209		}
210		
211		Ok(())
212	}
213	
214	fn patch(&mut self, name: &str, value: Value, client_id: Uuid) -> Result<(), Error> {
215		let inserted: bool;
216		
217		validate_object_name(name)?;
218		
219		if !value.is_object() {
220			return Err(Error::CantMergeObjects);
221		}
222		
223		self.log(LogMessage::Patch { object: name.to_string(), value: value.clone(), client: client_id });
224		
225		if let Some(object) = self.objects.get_mut(name) {
226			merge_into_object(&mut object.value, &value)?;
227			object.last_modified = Utc::now();
228			inserted = false;
229		} else {
230			self.objects.insert(name.to_string(), Object {
231				name: name.to_string(),
232				value,
233				last_modified: Utc::now(),
234			});
235			inserted = true;
236		}
237		
238		let object = self.objects[name].clone();
239		
240		if let Some(storage) = &self.storage {
241			if inserted {
242				storage.add_object(object.clone());
243			} else {
244				storage.change_object(object.clone());
245			}
246		}
247		
248		for client in self.clients.values_mut() {
249			for query in &mut client.queries {
250				if query.pattern.matches_str(name) {
251					let msg = if query.objects.contains(name) {
252						Message::QueryChange {
253							query_id: query.id,
254							object: object.clone(),
255						}
256					} else {
257						query.objects.insert(name.to_string());
258						Message::QueryAdd {
259							query_id: query.id,
260							object: object.clone(),
261						}
262					};
263					
264					let _ = client.inbox_tx.unbounded_send(msg);
265				}
266			}
267		}
268		
269		Ok(())
270	}
271	
272	fn remove(&mut self, name: &str, client_id: Uuid) -> Result<bool, Error> {
273		validate_object_name(name)?;
274		
275		if let Some(object) = self.objects.remove(name) {
276			self.log(LogMessage::Remove { object: name.to_string(), client: client_id });
277			
278			if let Some(storage) = &self.storage {
279				storage.remove_object(object.clone());
280			}
281			
282			for client in self.clients.values_mut() {
283				for query in &mut client.queries {
284					if query.objects.contains(name) {
285						let msg = Message::QueryRemove {
286							query_id: query.id,
287							object: object.clone()
288						};
289						let _ = client.inbox_tx.unbounded_send(msg);
290						
291						query.objects.remove(name);
292					}
293				}
294			}
295			
296			Ok(true)
297		} else {
298			Ok(false)
299		}
300	}
301	
302	fn internal_emit(&mut self, object: &str, event: &str, data: Value) -> Result<(), Error> {
303		if self.objects.get(object).is_none() {
304			return Err(Error::ObjectNotFound)
305		}
306		
307		for client in self.clients.values_mut() {
308			for query in &mut client.queries {
309				if query.objects.contains(object) {
310					let msg = Message::QueryEvent {
311						query_id: query.id,
312						object: object.to_string(),
313						event: event.to_string(),
314						data: data.clone(),
315					};
316					let _ = client.inbox_tx.unbounded_send(msg);
317				}
318			}
319		}
320		
321		Ok(())
322	}
323	
324	fn emit(&mut self, object: &str, event: &str, data: Value, client_id: Uuid) -> Result<(), Error> {
325		validate_object_name(object)?;
326		
327		self.log(LogMessage::Emit { object: object.to_string(), event: event.to_string(), data: data.clone(), client: client_id });
328		self.internal_emit(object, event, data)
329	}
330	
331	fn invoke(&mut self, object: &str, method: &str, args: Value, request_id: Value, client_id: Uuid) -> Result<(), Error> {
332		validate_object_name(object)?;
333		
334		let invocation_id = Uuid::new_v4();
335		
336		self.log(LogMessage::Invoke { object: object.to_string(), method: method.to_string(), args: args.clone(), invocation_id: invocation_id.clone(), client: client_id });
337		
338		if self.objects.get(object).is_none() {
339			return Err(Error::ObjectNotFound)
340		}
341		
342		for responder in self.clients.values_mut() {
343			for query in &mut responder.queries {
344				if query.provide_rpc {
345					if query.objects.contains(object) {
346						responder.invocations.push(Invocation {
347							id: invocation_id,
348							client_id,
349							request_id,
350							query_id: query.id,
351						});
352						
353						let msg = Message::QueryInvocation {
354							query_id: query.id,
355							invocation_id,
356							object: object.to_string(),
357							method: method.to_string(),
358							args: args.clone(),
359						};
360						let _ = responder.inbox_tx.unbounded_send(msg);
361						
362						return Ok(())
363					}
364				}
365			}
366		}
367		
368		Err(Error::ObjectNotInvocable)
369	}
370	
371	fn log(&mut self, message: LogMessage) {
372		self.logger.log(&message);
373		
374		self.internal_emit("$system", "log", serde_json::to_value(message).unwrap()).unwrap()
375	}
376}
377
378impl Server {
379	pub fn new(storage: Option<Box<dyn Storage + Send>>, logger: Box<dyn Logger + Send>) -> Self {
380		let mut objects = HashMap::new();
381		
382		objects.insert("$system".to_string(), Object {
383			name: "$system".to_string(),
384			value: json!({ "version": VERSION_STRING }),
385			last_modified: Utc::now(),
386		});
387		
388		if let Some(ref storage) = storage {
389			for object in storage.get_objects() {
390				objects.insert(object.name.clone(), object);
391			}
392		}
393		
394		let shared = Arc::new(Shared {
395			state: Mutex::new(State {
396				objects,
397				clients: HashMap::new(),
398				storage,
399				logger,
400			})
401		});
402		
403		Server { shared }
404	}
405	
406	pub fn client_connect(&self) -> Client {
407		let mut state = self.shared.state.lock().unwrap();
408		
409		let id = Uuid::new_v4();
410		
411		let (tx, rx) = unbounded();
412		
413		let client = ClientState {
414			id,
415			queries: vec![],
416			invocations: vec![],
417			inbox_tx: tx,
418			disconnect_commands: vec![],
419		};
420		
421		state.log(LogMessage::ClientConnect { client: id });
422		
423		state.clients.insert(id, client);
424		
425		Client { id, server: self.clone(), inbox_rx: rx }
426	}
427	
428	fn client_disconnect(&self, client_id: Uuid) {
429		let mut state = self.shared.state.lock().unwrap();
430		
431		let client = state.clients.remove(&client_id);
432		
433		if let Some(client) = client {
434			for invocation in client.invocations {
435				if let Some(client) = state.clients.get_mut(&invocation.client_id) {
436					let msg = Message::InvocationResult {
437						request_id: invocation.request_id,
438						result: Err(Error::ObjectNotInvocable),
439					};
440					let _ = client.inbox_tx.unbounded_send(msg);
441				}
442			}
443			
444			for command in client.disconnect_commands {
445				match command {
446					Command::Set { name, value } => {
447						let _ = state.set(&name, value, client.id);
448					},
449					Command::Patch { name, value } => {
450						let _ = state.patch(&name, value, client.id);
451					},
452					Command::Remove { name } => {
453						let _ = state.remove(&name, client.id);
454					},
455					Command::Emit { object, event, data } => {
456						let _ = state.emit(&object, &event, data, client.id);
457					},
458				}
459			}
460		}
461		
462		state.log(LogMessage::ClientDisconnect { client: client_id });
463	}
464	
465	pub fn set_disconnect_commands(&self, commands: Vec<Command>, client: &Client) -> Result<(), Error> {
466		let mut state = self.shared.state.lock().unwrap();
467		
468		if let Some(client) = state.clients.get_mut(&client.id) {
469			client.disconnect_commands = commands;
470			Ok(())
471		} else {
472			Err(Error::ClientNotFound)
473		}
474	}
475	
476	pub fn set(&self, name: &str, value: Value, client: &Client) -> Result<(), Error> {
477		let mut state = self.shared.state.lock().unwrap();
478		state.set(name, value, client.id)
479	}
480	
481	pub fn patch(&self, name: &str, value: Value, client: &Client) -> Result<(), Error> {
482		let mut state = self.shared.state.lock().unwrap();
483		state.patch(name, value, client.id)
484	}
485	
486	pub fn get(&self, pattern: &Pattern, client: &Client) -> Vec<Object> {
487		let mut state = self.shared.state.lock().unwrap();
488		
489		state.log(LogMessage::Get { pattern: pattern.string.clone(), client: client.id });
490		
491		state.objects.values().filter(|object| {
492			pattern.matches(&object.name)
493		}).cloned().collect()
494	}
495	
496	pub fn query(&self, pattern: &Pattern, provide_rpc: bool, client: &Client) -> Result<(Uuid, Vec<Object>),Error> {
497		let mut state = self.shared.state.lock().unwrap();
498		
499		let id = Uuid::new_v4();
500		
501		state.log(LogMessage::Query { pattern: pattern.string.clone(), provide_rpc, query: id, client: client.id });
502		
503		let objects: Vec<Object> = state.objects.values().filter(|object| {
504			pattern.matches(&object.name)
505		}).cloned().collect();
506		
507		if let Some(client) = state.clients.get_mut(&client.id) {
508			client.queries.push(Query {
509				id,
510				pattern: pattern.clone(),
511				provide_rpc,
512				objects: HashSet::from_iter(objects.iter().map(|object| object.name.clone())),
513			});
514			Ok((id, objects))
515		} else {
516			Err(Error::ClientNotFound)
517		}
518	}
519	
520	pub fn unsubscribe(&self, query_id: Uuid, client: &Client) -> Result<(), Error> {
521		let mut state = self.shared.state.lock().unwrap();
522		
523		state.log(LogMessage::Unsubscribe { query: query_id, client: client.id });
524		
525		let mut invocations: Vec<Invocation> = vec![];
526		{
527			let client = state.clients.get_mut(&client.id).unwrap();
528			
529			if let Some(index) = client.queries.iter().position(|query| query.id == query_id) {
530				client.queries.remove(index);
531				
532				// TODO: optimize away the vector and cloning
533				client.invocations.retain(|invocation| {
534					if invocation.query_id == query_id {
535						invocations.push(invocation.clone());
536						return false;
537					} else {
538						return true;
539					}
540				});
541			} else {
542				return Err(Error::QueryNotFound)
543			}
544		}
545		
546		for invocation in invocations {
547			if let Some(client) = state.clients.get_mut(&invocation.client_id) {
548				let msg = Message::InvocationResult {
549					request_id: invocation.request_id,
550					result: Err(Error::ObjectNotInvocable),
551				};
552				let _ = client.inbox_tx.unbounded_send(msg);
553			}
554		}
555		
556		Ok(())
557	}
558	
559	pub fn remove(&self, name: &str, client: &Client) -> Result<bool, Error> {
560		let mut state = self.shared.state.lock().unwrap();
561		state.remove(name, client.id)
562	}
563	
564	pub fn emit(&self, object: &str, event: &str, data: Value, client: &Client) -> Result<(), Error> {
565		let mut state = self.shared.state.lock().unwrap();
566		state.emit(object, event, data, client.id)
567	}
568	
569	pub fn invoke(&self, object: &str, method: &str, args: Value, request_id: Value, client: &Client) -> Result<(), Error> {
570		let mut state = self.shared.state.lock().unwrap();
571		state.invoke(object, method, args, request_id, client.id)
572	}
573	
574	pub fn invoke_result(&self, invocation_id: Uuid, result: Value, client: &Client) -> Result<(), Error> {
575		let mut state = self.shared.state.lock().unwrap();
576		
577		state.log(LogMessage::InvokeResult { invocation_id, result: result.clone(), client: client.id });
578		
579		let invocation: Option<Invocation> = (|| {
580			let client = state.clients.get_mut(&client.id).unwrap();
581			
582			if let Some(index) = client.invocations.iter().position(|invocation| invocation.id == invocation_id) {
583				Some(client.invocations.remove(index))
584			} else {
585				None
586			}
587		})();
588		
589		if let Some(invocation) = invocation {
590			if let Some(client) = state.clients.get_mut(&invocation.client_id) {
591				let msg = Message::InvocationResult {
592					request_id: invocation.request_id,
593					result: Ok(result),
594				};
595				let _ = client.inbox_tx.unbounded_send(msg);
596				
597				Ok(())
598			} else {
599				// client disconnected -> ignore
600				Ok(())
601			}
602		} else {
603			Err(Error::InvocationNotFound)
604		}
605	}
606}
607
608#[cfg(test)]
609mod tests {
610	use super::*;
611	use crate::server::logger::NullLogger;
612	use serde_json::json;
613	
614	fn create_server() -> Server {
615		Server::new(None, Box::new(NullLogger))
616	}
617	
618	#[test]
619	fn test_set_insert() {
620		let server = create_server();
621		let client = server.client_connect();
622		
623		server.set("foo", json!({ "bar": true }), &client).unwrap();
624		
625		let state = server.shared.state.lock().unwrap();
626		assert!(state.objects.contains_key("foo"));
627		assert_eq!(state.objects["foo"].name, "foo");
628		assert_eq!(state.objects["foo"].value, json!({ "bar": true }));
629	}
630	
631	#[test]
632	fn test_set_update() {
633		let server = create_server();
634		let client = server.client_connect();
635		
636		server.set("foo", json!({ "bar": true }), &client).unwrap();
637		server.set("foo", json!({ "bar": false }), &client).unwrap();
638		
639		let state = server.shared.state.lock().unwrap();
640		assert_eq!(state.objects["foo"].value, json!({ "bar": false }));
641	}
642	
643	#[test]
644	fn test_set_invalid_name() {
645		let server = create_server();
646		let client = server.client_connect();
647		
648		let result = server.set("$system", json!({ "bar": true }), &client);
649		assert_eq!(result, Err(Error::InvalidObjectName));
650	}
651	
652	#[test]
653	fn test_patch_invalid_name() {
654		let server = create_server();
655		let client = server.client_connect();
656		
657		let result = server.patch("$system", json!({ "bar": true }), &client);
658		assert_eq!(result, Err(Error::InvalidObjectName));
659	}
660	
661	#[test]
662	fn test_patch_insert() {
663		let server = create_server();
664		let client = server.client_connect();
665		
666		server.patch("foo", json!({ "bar": true }), &client).unwrap();
667		
668		let state = server.shared.state.lock().unwrap();
669		assert!(state.objects.contains_key("foo"));
670		assert_eq!(state.objects["foo"].name, "foo");
671		assert_eq!(state.objects["foo"].value, json!({ "bar": true }));
672	}
673	
674	#[test]
675	fn test_patch_insert_non_object() {
676		let server = create_server();
677		let client = server.client_connect();
678		
679		let result = server.patch("foo", json!(42), &client);
680		assert_eq!(result, Err(Error::CantMergeObjects));
681	}
682	
683	#[test]
684	fn test_patch_update_non_object() {
685		let server = create_server();
686		let client = server.client_connect();
687		
688		server.set("foo", json!(42), &client).unwrap();
689		
690		let result = server.patch("foo", json!({ "baz": true }), &client);
691		assert_eq!(result, Err(Error::CantMergeObjects));
692	}
693	
694	#[test]
695	fn test_patch_update_with_non_object() {
696		let server = create_server();
697		let client = server.client_connect();
698		
699		server.set("foo", json!({ "bar": true }), &client).unwrap();
700		
701		let result = server.patch("foo", json!(42), &client);
702		assert_eq!(result, Err(Error::CantMergeObjects));
703	}
704	
705	#[test]
706	fn test_patch_update() {
707		let server = create_server();
708		let client = server.client_connect();
709		
710		server.set("foo", json!({ "bar": true }), &client).unwrap();
711		server.patch("foo", json!({ "baz": true }), &client).unwrap();
712		
713		let state = server.shared.state.lock().unwrap();
714		assert!(state.objects.contains_key("foo"));
715		assert_eq!(state.objects["foo"].name, "foo");
716		assert_eq!(state.objects["foo"].value, json!({ "bar": true, "baz": true }));
717	}
718	
719	#[test]
720	fn test_patch_update_non_deep() {
721		let server = create_server();
722		let client = server.client_connect();
723		
724		server.set("foo", json!({ "on": true, "color": { "hue": 100, "saturation": 100 } }), &client).unwrap();
725		server.patch("foo", json!({ "color": { "temp": 50 } }), &client).unwrap();
726		
727		let state = server.shared.state.lock().unwrap();
728		assert!(state.objects.contains_key("foo"));
729		assert_eq!(state.objects["foo"].name, "foo");
730		assert_eq!(state.objects["foo"].value, json!({ "on": true, "color": { "temp": 50 } }));
731	}
732	
733	#[test]
734	fn test_get() {
735		let server = create_server();
736		let client = server.client_connect();
737		
738		server.set("livingroom/temperature", json!({ "temp": 20.3 }), &client).unwrap();
739		server.set("livingroom/humidity", json!({ "humid": 40 }), &client).unwrap();
740		server.set("bedroom/temperature", json!({ "temp": 19 }), &client).unwrap();
741		
742		let result = server.get(&Pattern::compile("$system").unwrap(), &client);
743		assert_eq!(result.len(), 1);
744		
745		let result = server.get(&Pattern::compile("*").unwrap(), &client);
746		assert_eq!(result.len(), 3);
747		
748		let result = server.get(&Pattern::compile("*,$system").unwrap(), &client);
749		assert_eq!(result.len(), 4);
750		
751		let result = server.get(&Pattern::compile("+/temperature,+/humidity").unwrap(), &client);
752		assert_eq!(result.len(), 3);
753		
754		let result = server.get(&Pattern::compile("livingroom/+").unwrap(), &client);
755		assert_eq!(result.len(), 2);
756		
757		let result = server.get(&Pattern::compile("+/humidity").unwrap(), &client);
758		assert_eq!(result.len(), 1);
759	}
760	
761	#[test]
762	fn test_query() {
763		let server = create_server();
764		let client1 = server.client_connect();
765		let mut client2 = server.client_connect();
766		
767		server.set("livingroom/temperature", json!({ "temp": 20.3 }), &client1).unwrap();
768		
769		let (query_id, objects) = server.query(&Pattern::compile("+/temperature").unwrap(), false, &client2).unwrap();
770		
771		assert_eq!(objects.len(), 1);
772		assert_eq!(objects[0].name, "livingroom/temperature");
773		assert_eq!(objects[0].value, json!({ "temp": 20.3 }));
774		
775		server.set("livingroom/temperature", json!({ "temp": 20.4 }), &client1).unwrap();
776		server.set("livingroom/temperature", json!({ "temp": 20.5 }), &client1).unwrap();
777		server.set("bedroom/temperature", json!({ "temp": 19.0 }), &client1).unwrap();
778		server.set("bedroom/temperature", json!({ "temp": 19.1 }), &client1).unwrap();
779		
780		let msg = client2.inbox_try_next().unwrap().unwrap();
781		
782		if let Message::QueryChange { query_id: msg_query_id, object } = msg {
783			assert_eq!(msg_query_id, query_id);
784			assert_eq!(object.name, "livingroom/temperature");
785			assert_eq!(object.value, json!({ "temp": 20.4 }));
786		} else {
787			assert!(false);
788		}
789		
790		let msg = client2.inbox_try_next().unwrap().unwrap();
791		
792		if let Message::QueryChange { query_id: msg_query_id, object } = msg {
793			assert_eq!(msg_query_id, query_id);
794			assert_eq!(object.name, "livingroom/temperature");
795			assert_eq!(object.value, json!({ "temp": 20.5 }));
796		} else {
797			assert!(false);
798		}
799		
800		let msg = client2.inbox_try_next().unwrap().unwrap();
801		
802		if let Message::QueryAdd { query_id: msg_query_id, object } = msg {
803			assert_eq!(msg_query_id, query_id);
804			assert_eq!(object.name, "bedroom/temperature");
805			assert_eq!(object.value, json!({ "temp": 19.0 }));
806		} else {
807			assert!(false);
808		}
809		
810		let msg = client2.inbox_try_next().unwrap().unwrap();
811		
812		if let Message::QueryChange { query_id: msg_query_id, object } = msg {
813			assert_eq!(msg_query_id, query_id);
814			assert_eq!(object.name, "bedroom/temperature");
815			assert_eq!(object.value, json!({ "temp": 19.1 }));
816		} else {
817			assert!(false);
818		}
819		
820		assert!(client2.inbox_try_next().is_err());
821	}
822	
823	#[test]
824	fn test_unsubscribe() {
825		let server = create_server();
826		let client1 = server.client_connect();
827		let mut client2 = server.client_connect();
828		
829		server.set("livingroom/temperature", json!({ "temp": 20.3 }), &client1).unwrap();
830		
831		let (query_id, _) = server.query(&Pattern::compile("+/temperature").unwrap(), false, &client2).unwrap();
832		
833		server.set("livingroom/temperature", json!({ "temp": 20.4 }), &client1).unwrap();
834		
835		let msg = client2.inbox_try_next().unwrap().unwrap();
836		if let Message::QueryChange { query_id: msg_query_id, object } = msg {
837			assert_eq!(msg_query_id, query_id);
838			assert_eq!(object.name, "livingroom/temperature");
839			assert_eq!(object.value, json!({ "temp": 20.4 }));
840		} else {
841			assert!(false);
842		}
843		
844		server.unsubscribe(query_id, &client2).unwrap();
845		
846		server.set("livingroom/temperature", json!({ "temp": 20.5 }), &client1).unwrap();
847		
848		assert!(client2.inbox_try_next().is_err());
849	}
850	
851	#[test]
852	fn test_remove_non_existing() {
853		let server = create_server();
854		let client = server.client_connect();
855		
856		let existed = server.remove("foo", &client).unwrap();
857		assert!(!existed);
858	}
859	
860	#[test]
861	fn test_remove_existing() {
862		let server = create_server();
863		let client = server.client_connect();
864		
865		server.set("foo", json!({ "bar": 1 }), &client).unwrap();
866		
867		let existed = server.remove("foo", &client).unwrap();
868		assert!(existed);
869	}
870	
871	#[test]
872	fn test_remove_query() {
873		let server = create_server();
874		let client = server.client_connect();
875		
876		server.set("foo", json!({ "bar": 1 }), &client).unwrap();
877		
878		let mut client = server.client_connect();
879		
880		let (query_id, _) = server.query(&Pattern::compile("*").unwrap(), false, &client).unwrap();
881		
882		server.remove("foo", &client).unwrap();
883		
884		let msg = client.inbox_try_next().unwrap().unwrap();
885		
886		if let Message::QueryRemove { query_id: msg_query_id, object } = msg {
887			assert_eq!(msg_query_id, query_id);
888			assert_eq!(object.name, "foo");
889			assert_eq!(object.value, json!({ "bar": 1 }));
890		} else {
891			assert!(false);
892		}
893		
894		server.set("foo", json!({ "bar": 1 }), &client).unwrap();
895		
896		let msg = client.inbox_try_next().unwrap().unwrap();
897		
898		if let Message::QueryAdd { query_id: msg_query_id, object } = msg {
899			assert_eq!(msg_query_id, query_id);
900			assert_eq!(object.name, "foo");
901			assert_eq!(object.value, json!({ "bar": 1 }));
902		} else {
903			assert!(false);
904		}
905		
906		assert!(client.inbox_try_next().is_err());
907	}
908	
909	#[test]
910	fn test_emit_event() {
911		let server = create_server();
912		let client = server.client_connect();
913		
914		server.set("gamepad", json!({ "buttons": ["a", "b"] }), &client).unwrap();
915		
916		let mut client = server.client_connect();
917		
918		let (query_id, _) = server.query(&Pattern::compile("*").unwrap(), false, &client).unwrap();
919		
920		server.emit("gamepad", "buttonpress", json!({ "button": "a" }), &client).unwrap();
921		
922		let msg = client.inbox_try_next().unwrap().unwrap();
923		
924		if let Message::QueryEvent { query_id: msg_query_id, object, event, data } = msg {
925			assert_eq!(msg_query_id, query_id);
926			assert_eq!(object, "gamepad");
927			assert_eq!(event, "buttonpress");
928			assert_eq!(data, json!({ "button": "a" }));
929		} else {
930			assert!(false);
931		}
932		
933		assert!(client.inbox_try_next().is_err());
934	}
935	
936	#[test]
937	fn test_emit_event_doesnt_exist() {
938		let server = create_server();
939		let client = server.client_connect();
940		
941		let result = server.emit("gamepad", "buttonpress", json!({ "button": "a" }), &client);
942		
943		assert_eq!(result, Err(Error::ObjectNotFound));
944	}
945	
946	#[test]
947	fn test_invoke_doesnt_exist() {
948		let server = create_server();
949		let client = server.client_connect();
950		
951		let result = server.invoke("lamp", "setState", json!({ "on": true }), json!(1), &client);
952		
953		assert_eq!(result, Err(Error::ObjectNotFound));
954	}
955	
956	#[test]
957	fn test_invoke_not_invokable() {
958		let server = create_server();
959		let client = server.client_connect();
960		
961		server.set("lamp", json!({ "on": false }), &client).unwrap();
962		
963		let result = server.invoke("lamp", "setState", json!({ "on": true }), json!(1), &client);
964		
965		assert_eq!(result, Err(Error::ObjectNotInvocable));
966	}
967	
968	#[test]
969	fn test_invoke() {
970		let server = create_server();
971		let mut provider = server.client_connect();
972		let mut consumer = server.client_connect();
973		
974		server.set("lamp", json!({ "on": false }), &provider).unwrap();
975		let (query_id, _) = server.query(&Pattern::compile("lamp").unwrap(), true, &provider).unwrap();
976		
977		let result = server.invoke("lamp", "setState", json!({ "on": true }), json!(1), &consumer);
978		assert_eq!(result, Ok(()));
979		
980		let msg = provider.inbox_try_next().unwrap().unwrap();
981		
982		let invocation_id;
983		
984		if let Message::QueryInvocation { query_id: msg_query_id, invocation_id: msg_invocation_id, object, method, args } = msg {
985			assert_eq!(msg_query_id, query_id);
986			assert_eq!(object, "lamp");
987			assert_eq!(method, "setState");
988			assert_eq!(args, json!({ "on": true }));
989			invocation_id = msg_invocation_id;
990		} else {
991			assert!(false);
992			return;
993		}
994		
995		server.invoke_result(invocation_id, json!({ "success": true }), &provider).unwrap();
996		
997		let msg = consumer.inbox_try_next().unwrap().unwrap();
998		
999		if let Message::InvocationResult { request_id, result } = msg {
1000			assert_eq!(request_id, json!(1));
1001			assert_eq!(result, Ok(json!({ "success": true })));
1002		} else {
1003			assert!(false);
1004		}
1005	}
1006	
1007	#[test]
1008	fn test_invoke_client_disconnect() {
1009		let server = create_server();
1010		let mut provider = server.client_connect();
1011		let mut consumer = server.client_connect();
1012		
1013		server.set("lamp", json!({ "on": false }), &provider).unwrap();
1014		let (query_id, _) = server.query(&Pattern::compile("lamp").unwrap(), true, &provider).unwrap();
1015		
1016		let result = server.invoke("lamp", "setState", json!({ "on": true }), json!(1), &consumer);
1017		assert_eq!(result, Ok(()));
1018		
1019		let msg = provider.inbox_try_next().unwrap().unwrap();
1020		
1021		if let Message::QueryInvocation { query_id: msg_query_id, invocation_id: _invocation_id, object, method, args } = msg {
1022			assert_eq!(msg_query_id, query_id);
1023			assert_eq!(object, "lamp");
1024			assert_eq!(method, "setState");
1025			assert_eq!(args, json!({ "on": true }));
1026		} else {
1027			assert!(false);
1028			return;
1029		}
1030		
1031		// disconnect before providing an invocation result
1032		drop(provider);
1033		
1034		let msg = consumer.inbox_try_next().unwrap().unwrap();
1035		
1036		if let Message::InvocationResult { request_id, result } = msg {
1037			assert_eq!(request_id, json!(1));
1038			assert_eq!(result, Err(Error::ObjectNotInvocable));
1039		} else {
1040			assert!(false);
1041		}
1042	}
1043	
1044	#[test]
1045	fn test_invoke_unsubscribe() {
1046		let server = create_server();
1047		let mut provider = server.client_connect();
1048		let mut consumer = server.client_connect();
1049		
1050		server.set("lamp", json!({ "on": false }), &provider).unwrap();
1051		let (query_id, _) = server.query(&Pattern::compile("lamp").unwrap(), true, &provider).unwrap();
1052		
1053		let result = server.invoke("lamp", "setState", json!({ "on": true }), json!(1), &consumer);
1054		assert_eq!(result, Ok(()));
1055		
1056		let msg = provider.inbox_try_next().unwrap().unwrap();
1057		
1058		if let Message::QueryInvocation { query_id: msg_query_id, invocation_id: _invocation_id, object, method, args } = msg {
1059			assert_eq!(msg_query_id, query_id);
1060			assert_eq!(object, "lamp");
1061			assert_eq!(method, "setState");
1062			assert_eq!(args, json!({ "on": true }));
1063		} else {
1064			assert!(false);
1065			return;
1066		}
1067		
1068		// unsubscribe before providing an invocation result
1069		server.unsubscribe(query_id, &provider).unwrap();
1070		
1071		let msg = consumer.inbox_try_next().unwrap().unwrap();
1072		
1073		if let Message::InvocationResult { request_id, result } = msg {
1074			assert_eq!(request_id, json!(1));
1075			assert_eq!(result, Err(Error::ObjectNotInvocable));
1076		} else {
1077			assert!(false);
1078		}
1079	}
1080	
1081	#[test]
1082	fn test_disconnect_command_set() {
1083		let server = create_server();
1084		let mut observer = server.client_connect();
1085		let device = server.client_connect();
1086		
1087		server.set("lamp", json!({ "online": true }), &device).unwrap();
1088		server.set_disconnect_commands(vec![
1089			Command::Set {
1090				name: "lamp".to_string(),
1091				value: json!({ "online": false }),
1092			}
1093		], &device).unwrap();
1094		
1095		let (query_id, _) = server.query(&Pattern::compile("lamp").unwrap(), true, &observer).unwrap();
1096		
1097		drop(device);
1098		
1099		let msg = observer.inbox_try_next().unwrap().unwrap();
1100		
1101		if let Message::QueryChange { query_id: msg_query_id, object } = msg {
1102			assert_eq!(msg_query_id, query_id);
1103			assert_eq!(object.name, "lamp");
1104			assert_eq!(object.value, json!({ "online": false }));
1105		} else {
1106			assert!(false);
1107		}
1108		
1109		assert!(observer.inbox_try_next().is_err());
1110	}
1111	
1112	#[test]
1113	fn test_disconnect_command_patch() {
1114		let server = create_server();
1115		let mut observer = server.client_connect();
1116		let device = server.client_connect();
1117		
1118		server.patch("lamp", json!({ "online": true }), &device).unwrap();
1119		server.set_disconnect_commands(vec![
1120			Command::Patch {
1121				name: "lamp".to_string(),
1122				value: json!({ "online": false }),
1123			}
1124		], &device).unwrap();
1125		
1126		let (query_id, _) = server.query(&Pattern::compile("lamp").unwrap(), true, &observer).unwrap();
1127		
1128		drop(device);
1129		
1130		let msg = observer.inbox_try_next().unwrap().unwrap();
1131		
1132		if let Message::QueryChange { query_id: msg_query_id, object } = msg {
1133			assert_eq!(msg_query_id, query_id);
1134			assert_eq!(object.name, "lamp");
1135			assert_eq!(object.value, json!({ "online": false }));
1136		} else {
1137			assert!(false);
1138		}
1139		
1140		assert!(observer.inbox_try_next().is_err());
1141	}
1142	
1143	#[test]
1144	fn test_disconnect_command_remove() {
1145		let server = create_server();
1146		let mut observer = server.client_connect();
1147		let device = server.client_connect();
1148		
1149		server.patch("client", json!({ "online": true }), &device).unwrap();
1150		server.set_disconnect_commands(vec![
1151			Command::Remove {
1152				name: "client".to_string(),
1153			}
1154		], &device).unwrap();
1155		
1156		let (query_id, _) = server.query(&Pattern::compile("client").unwrap(), true, &observer).unwrap();
1157		
1158		drop(device);
1159		
1160		let msg = observer.inbox_try_next().unwrap().unwrap();
1161		
1162		if let Message::QueryRemove { query_id: msg_query_id, object } = msg {
1163			assert_eq!(msg_query_id, query_id);
1164			assert_eq!(object.name, "client");
1165		} else {
1166			assert!(false);
1167		}
1168		
1169		assert!(observer.inbox_try_next().is_err());
1170	}
1171	
1172	#[test]
1173	fn test_disconnect_command_emit() {
1174		let server = create_server();
1175		let mut observer = server.client_connect();
1176		let device = server.client_connect();
1177		
1178		server.set("lamp", json!({ "on": false }), &device).unwrap();
1179		server.set_disconnect_commands(vec![
1180			Command::Emit {
1181				object: "lamp".to_string(),
1182				event: "offline".to_string(),
1183				data: json!({}),
1184			}
1185		], &device).unwrap();
1186		
1187		let (query_id, _) = server.query(&Pattern::compile("lamp").unwrap(), true, &observer).unwrap();
1188		
1189		drop(device);
1190		
1191		let msg = observer.inbox_try_next().unwrap().unwrap();
1192		
1193		if let Message::QueryEvent { query_id: msg_query_id, object, event, data } = msg {
1194			assert_eq!(msg_query_id, query_id);
1195			assert_eq!(object, "lamp");
1196			assert_eq!(event, "offline");
1197			assert_eq!(data, json!({}));
1198		} else {
1199			assert!(false);
1200		}
1201		
1202		assert!(observer.inbox_try_next().is_err());
1203	}
1204}