use std::{
collections::BTreeMap,
sync::{Arc, LazyLock, Mutex},
time::{Duration, Instant},
vec,
};
use std::str::FromStr;
use iotscape::*;
static SERVER: LazyLock<String> = LazyLock::new(|| std::env::var("IOTSCAPE_SERVER").unwrap_or("127.0.0.1:1978".to_string()));
static ANNOUNCE_ENDPOINT: LazyLock<String> = LazyLock::new(|| std::env::var("IOTSCAPE_ANNOUNCE_ENDPOINT").unwrap_or("http://localhost:8080/routes/iotscape/announce".to_string()));
static RESPONSE_ENDPOINT: LazyLock<String> = LazyLock::new(|| std::env::var("IOTSCAPE_RESPONSE_ENDPOINT").unwrap_or("http://localhost:8080/routes/iotscape/response".to_string()));
#[tokio::main]
async fn main() {
simple_logger::init_with_level(log::Level::Info).unwrap();
let mut definition = ServiceDefinition {
id: "rs1".to_owned(),
methods: BTreeMap::new(),
events: BTreeMap::new(),
description: IoTScapeServiceDescription {
description: Some("Test IoTScape service.".to_owned()),
externalDocumentation: None,
termsOfService: None,
contact: Some("gstein@ltu.edu".to_owned()),
license: None,
version: "1".to_owned(),
},
};
definition.methods.insert(
"helloWorld".to_owned(),
MethodDescription {
documentation: Some("Says \"Hello, World!\"".to_owned()),
params: vec![],
returns: MethodReturns {
documentation: Some("The text \"Hello, World!\"".to_owned()),
r#type: vec!["string".to_owned()],
},
},
);
definition.methods.insert(
"add".to_owned(),
MethodDescription {
documentation: Some("Adds two numbers".to_owned()),
params: vec![
MethodParam {
name: "a".to_owned(),
documentation: Some("First number".to_owned()),
r#type: "number".to_owned(),
optional: false,
},
MethodParam {
name: "b".to_owned(),
documentation: Some("Second number".to_owned()),
r#type: "number".to_owned(),
optional: false,
},
],
returns: MethodReturns {
documentation: Some("The sum of a and b".to_owned()),
r#type: vec!["number".to_owned()],
},
},
);
definition.methods.insert(
"timer".to_owned(),
MethodDescription {
documentation: Some("Sends timer event on a delay".to_owned()),
params: vec![MethodParam {
name: "msec".to_owned(),
documentation: Some("Amount of time to wait, in ms".to_owned()),
r#type: "number".to_owned(),
optional: false,
}],
returns: MethodReturns {
documentation: Some("Response after delay".to_owned()),
r#type: vec!["event timer".to_owned()],
},
},
);
definition.methods.insert(
"returnComplex".to_owned(),
MethodDescription {
documentation: Some("Complex response to method".to_owned()),
params: vec![],
returns: MethodReturns {
documentation: Some("Complex object".to_owned()),
r#type: vec!["string".to_owned(), "string".to_owned()],
},
},
);
definition.events.insert(
"timer".to_owned(),
EventDescription { params: vec![] },
);
let service: Arc<Mutex<IoTScapeService>> = Arc::from(Mutex::new(IoTScapeService::new(
"ExampleService",
definition,
SERVER.parse().unwrap(),
)));
if let Err(e) = service
.lock()
.unwrap()
.announce() {
println!("Could not announce to server: {}", e);
}
let mut last_announce = Instant::now();
let announce_period = Duration::from_secs(30);
let service_clone = Arc::clone(&service);
tokio::task::spawn(async move {
let service = service_clone;
loop {
tokio::time::sleep(Duration::from_millis(1)).await;
service.lock().unwrap().poll(Some(Duration::from_millis(1)));
if last_announce.elapsed() > announce_period {
println!("Re-announcing to server");
if let Err(e) = service
.lock()
.unwrap()
.announce() {
println!("Could not announce to server: {}", e);
}
last_announce = Instant::now();
}
loop {
if service.lock().unwrap().rx_queue.len() == 0 {
break;
}
let next_msg = service.lock().unwrap().rx_queue.pop_front().unwrap();
println!("Handling message {:?}", next_msg);
match next_msg.function.as_str() {
"helloWorld" => {
service
.lock()
.unwrap()
.enqueue_response_to(next_msg, Ok(vec!["Hello, World!".to_owned().into()])).unwrap();
},
"add" => {
let result: f64 = next_msg
.params
.iter()
.map(|v|
match v {
serde_json::Value::Number(n) => n.as_f64().unwrap_or_default(),
serde_json::Value::String(s) => f64::from_str(&s).unwrap_or_default(),
_ => 0.0,
})
.sum();
let service: Arc<Mutex<IoTScapeService>> = service.clone();
tokio::task::spawn_blocking(move || {
service
.lock()
.unwrap()
.enqueue_response_to_http(&RESPONSE_ENDPOINT, next_msg, Ok(vec![result.to_string().into()])).unwrap();
});
},
"timer" => {
let ms = next_msg
.params
.get(0).and_then(|x| u64::from_str_radix(&x.to_string(), 10).ok())
.unwrap_or(0);
tokio::spawn(delayed_event(
Arc::clone(&service),
ms,
next_msg.id.clone(),
"timer",
BTreeMap::new(),
));
service
.lock()
.unwrap()
.enqueue_response_to(next_msg, Ok(vec![])).unwrap();
},
"returnComplex" => {
let image = std::fs::read("examples/figure.png").expect("Could not read image file");
let image = "<costume name=\"costume\" collabId=\"\" center-x=\"43.5\" center-y=\"62\" image=\"data:image/png;base64,".to_string() + base64::encode(&image).as_str() + "\"/>";
let service: Arc<Mutex<IoTScapeService>> = service.clone();
tokio::task::spawn_blocking(move || {
service.lock().unwrap()
.enqueue_response_to_http(&RESPONSE_ENDPOINT, next_msg, Ok(vec![vec![Into::<serde_json::Value>::into("test"), vec![1, 2, 3].into(), vec![image].into()].into()])).expect("Could not enqueue response");
});
},
"_requestedKey" => {
println!("Received key: {:?}", next_msg.params);
service
.lock()
.unwrap()
.enqueue_response_to(next_msg, Ok(vec![])).unwrap();
},
t => {
println!("Unrecognized function {}", t);
}
}
}
}
});
loop {
tokio::time::sleep(Duration::from_millis(1)).await;
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
let mut parts = input.split_whitespace();
let command = parts.next().unwrap_or_default();
let _args = parts.collect::<Vec<&str>>();
match command {
"announce" => {
service.lock().unwrap().announce().expect("Could not announce to server");
},
"announcehttp" => {
let service = Arc::clone(&service);
let announce_endpoint = ANNOUNCE_ENDPOINT.clone();
tokio::task::spawn_blocking(move || {
service.lock().unwrap().announce_http(&announce_endpoint).expect("Could not announce to server");
}).await.expect("Could not spawn blocking task");
},
"announcelite" => {
service.lock().unwrap().announce_lite().expect("Could not announce to server");
},
"getkey" => {
let mut s = service.lock().unwrap();
let next_msg_id = s.next_msg_id.to_string();
s.send_event(&next_msg_id, "_requestKey", BTreeMap::default()).unwrap();
s.next_msg_id += 1;
},
"reset" => {
let mut s = service.lock().unwrap();
let next_msg_id = s.next_msg_id.to_string();
s.send_event(&next_msg_id, "_reset", BTreeMap::default()).unwrap();
s.next_msg_id += 1;
},
"help" => {
println!("Commands:");
println!(" announce - send a new announce to the server");
println!(" announcehttp - send a new announce to the server via HTTP");
println!(" announcelite - send a new announce to the server with minimal information");
println!(" getkey - request a key from the server");
println!(" reset - reset the encryption settings on the server");
println!(" quit - exit the program");
},
"quit" => {
break;
},
_ => {
println!("Unrecognized command {}", command);
}
}
}
}
async fn delayed_event(
service: Arc<Mutex<IoTScapeService>>,
delay: u64,
call_id: String,
event_type: &str,
args: BTreeMap<String, String>,
) {
tokio::time::sleep(Duration::from_millis(delay)).await;
println!("Sending event {} with args {:?} after {} ms", event_type, args, delay);
service
.lock()
.unwrap()
.send_event(call_id.as_str(), event_type, args).unwrap();
}