#[cfg(test)]
#[cfg(feature = "std")]
mod pk_command_integration_tests {
use std::sync::mpsc::channel;
use std::thread;
use std::time::Duration;
use pk_command::{PkCommand, PkCommandConfig, types::Operation as PkOperation};
use pk_command::{PkHashmapMethod, PkHashmapVariable, PkPromise};
const VARIA: &[u8] = b"variable value";
const LONGV: &[u8] =b"(this is a very long string)Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.";
fn threads_simulation(
operation: PkOperation,
object: Option<String>,
data: Option<Vec<u8>>,
then: Box<dyn Fn(Vec<u8>) -> ()>,
) {
let (host_tx, device_rx) = channel::<Vec<u8>>(); let (device_tx, host_rx) = channel::<Vec<u8>>();
let host_thread = thread::Builder::new()
.name("HostThread".to_string())
.spawn(move || {
println!("[Host] Thread started");
let var_accessor = PkHashmapVariable::new(vec![]);
let method_accessor = PkHashmapMethod::new(vec![]);
let host_pkc = PkCommand::<_, _, std::time::Instant>::new(
PkCommandConfig::default(64),
var_accessor,
method_accessor,
);
host_pkc
.perform(operation, object.clone(), data)
.expect(&format!("Host failed to perform {:?}", operation));
println!("[Host] Performed {:?} for {:?}", operation, object);
let mut data: Vec<u8> = b"failed".into();
for i in 0..10000 {
if let Some(cmd_to_send) = host_pkc.poll() {
println!("[Host] Sending (iter {}): {}", i, cmd_to_send);
if host_tx.send(cmd_to_send.to_bytes()).is_err() {
break;
} }
match host_rx.try_recv() {
Ok(received_bytes) => {
println!(
"[Host] Received {} bytes (iter {})",
received_bytes.len(),
i
);
if let Err(e) = host_pkc.incoming_command(received_bytes) {
println!("[Host] Error processing incoming command: {}", e);
break;
}
}
Err(std::sync::mpsc::TryRecvError::Empty) => { }
Err(_) => {
break;
} }
if host_pkc.is_complete() {
println!("[Host] Transaction complete (iter {}).", i);
if let Some(ret_data) = host_pkc.get_return_data() {
println!(
"[Host] Got return data: {:?}",
String::from_utf8_lossy(&ret_data)
);
data = ret_data;
} else {
println!("[Host] No return data, but transaction complete.");
}
break;
}
thread::sleep(Duration::from_millis(10)); }
println!("[Host] Thread finished.",);
data })
.unwrap();
let device_thread = thread::Builder::new()
.name("DeviceThread".to_string())
.spawn(move || {
println!("[Device] Thread started");
let variable_listener = move |name: &'static str| {
return move |_: Vec<u8>| {
println!("[Variable Accessor] {} is changed", name);
};
};
let var_accessor = PkHashmapVariable::new(vec![
(
String::from("VARIA"),
Some(VARIA.to_vec()),
Box::new(variable_listener("VARIA")),
),
(
String::from("LONGV"),
Some(LONGV.to_vec()),
Box::new(variable_listener("LONGV")),
),
]);
let method_accessor = PkHashmapMethod::new(vec![
(
String::from("ECHOO"),
Box::new(|param| {
PkPromise::execute(|resolve| {
resolve(param.unwrap_or(b"empty".to_vec()))
})
}),
),
(
String::from("DEVID"),
Box::new(|_| PkPromise::execute(|resolve| resolve(b"device_123".to_vec()))),
),
(
String::from("LONGO"),
Box::new(|_| {
PkPromise::execute(|resolve| {
thread::sleep(Duration::from_secs(2));
resolve(b"long_op_done".to_vec())
})
}),
),
]);
let device_pkc = PkCommand::<_, _, std::time::Instant>::new(
PkCommandConfig::default(64),
var_accessor,
method_accessor,
);
for i in 0..10000 {
match device_rx.try_recv() {
Ok(received_bytes) => {
println!(
"[Device] Received {} bytes (iter {})",
received_bytes.len(),
i
);
if let Err(e) = device_pkc.incoming_command(received_bytes) {
println!("[Device] Error processing incoming command: {}", e);
break;
}
}
Err(std::sync::mpsc::TryRecvError::Empty) => { }
Err(_) => {
break;
} }
if let Some(cmd_to_send) = device_pkc.poll() {
println!("[Device] Sending (iter {}): {}", i, cmd_to_send);
if device_tx.send(cmd_to_send.to_bytes()).is_err() {
break;
} }
if device_pkc.is_complete() {
println!("[Device] Became idle and complete (iter {}).", i);
}
thread::sleep(Duration::from_millis(10));
}
println!("[Device] Thread finished");
})
.unwrap();
let host_result = host_thread.join().expect("Host thread panicked");
device_thread.join().expect("Device thread panicked");
then(host_result);
}
#[test]
fn test_requv_simulation() -> () {
threads_simulation(
PkOperation::RequireVariable,
Some("VARIA".to_string()),
None,
Box::from(|data| assert_eq!(data, VARIA.to_vec(),)),
);
}
#[test]
fn test_long_requv_simulation() -> () {
threads_simulation(
PkOperation::RequireVariable,
Some("LONGV".to_string()),
None,
Box::from(|data| assert_eq!(data, LONGV.to_vec(),)),
);
}
#[test]
fn test_sendv_simulation() -> () {
threads_simulation(
PkOperation::SendVariable,
Some("VARIA".to_string()),
Some(b"new value".to_vec()),
Box::from(|data| {
assert_eq!(data, b"failed"); }),
);
}
#[test]
fn test_long_sendv_simulation() -> () {
threads_simulation(
PkOperation::SendVariable,
Some("LONGV".to_string()),
Some(LONGV.to_vec()),
Box::from(|data| {
assert_eq!(data, b"failed"); }),
);
}
#[test]
fn test_invok_echo_simulation() -> () {
threads_simulation(
PkOperation::Invoke,
Some("ECHOO".to_string()),
Some(b"echo this back".to_vec()),
Box::from(|data| assert_eq!(data, b"echo this back",)),
);
}
#[test]
fn test_invok_long_echo_simulation() -> () {
threads_simulation(
PkOperation::Invoke,
Some("ECHOO".to_string()),
Some(LONGV.to_vec()),
Box::from(|data| assert_eq!(data, LONGV.to_vec(),)),
);
}
#[test]
fn test_invok_deviceid_simulation() -> () {
threads_simulation(
PkOperation::Invoke,
Some("DEVID".to_string()),
None, Box::from(|data| assert_eq!(data, b"device_123",)),
);
}
#[test]
fn test_invok_longop_simulation() -> () {
threads_simulation(
PkOperation::Invoke,
Some(String::from("LONGO")),
None,
Box::from(|data| assert_eq!(data, b"long_op_done")),
);
}
#[test]
fn test_get_version_simulation() -> () {
threads_simulation(
PkOperation::GetVersion,
None,
None,
Box::from(|data| {
assert_eq!(data, Vec::from(env!("CARGO_PKG_VERSION").as_bytes()));
}),
);
}
}