use flo_scene::*;
use flo_scene::commands::*;
use flo_scene::programs::*;
use flo_scene_pipe::*;
use flo_scene_pipe::commands::*;
use futures::prelude::*;
use tokio::io::*;
use serde::*;
#[test]
fn error_from_internal_socket() {
let scene = Scene::default();
let test_program = SubProgramId::new();
#[derive(Serialize, Deserialize)]
struct TestSucceeded;
impl SceneMessage for TestSucceeded { }
let command_program = SubProgramId::new();
scene.add_subprogram(command_program, |input, context| command_connection_program(input, context, ()), 0);
let socket_program = SubProgramId::new();
start_internal_socket_program(&scene, socket_program, read_command_data, write_command_data).unwrap();
scene.connect_programs(socket_program, command_program, StreamId::with_message_type::<CommandProgramSocketMessage>()).unwrap();
scene.add_subprogram(SubProgramId::new(), |_input: InputStream<()>, context| async move {
let test_commands = "error::message [ \"json\", \"array\" ]\n";
let (our_side, their_side) = duplex(1024);
let (command_input, command_output) = split(their_side);
let (read_result, write_command) = split(our_side);
let mut socket_program = context.send(socket_program).unwrap();
socket_program.send(InternalSocketMessage::CreateInternalSocket(Box::new(command_input), Box::new(command_output))).await.ok().unwrap();
let mut write_command = write_command;
println!("> {:?}", test_commands);
write_command.write_all(&test_commands.bytes().collect::<Vec<u8>>()).await.unwrap();
write_command.shutdown().await.unwrap();
let mut read_result = read_result;
while let Ok(msg) = read_result.read_u8().await {
println!("{:?}", msg as char);
}
println!("DONE");
context.send_message(TestSucceeded).await.ok();
}, 0);
TestBuilder::new()
.expect_message(|_: TestSucceeded| Ok(()))
.run_in_scene_with_threads(&scene, test_program, 5);
}
fn run_expected_error_command_without_closing(command: impl Into<String>) {
let test_commands = command.into();
let scene = Scene::default();
let test_program = SubProgramId::new();
#[derive(Serialize, Deserialize)]
struct TestSucceeded;
impl SceneMessage for TestSucceeded { }
let command_program = SubProgramId::new();
scene.add_subprogram(command_program, |input, context| command_connection_program(input, context, ()), 0);
let socket_program = SubProgramId::new();
start_internal_socket_program(&scene, socket_program, read_command_data, write_command_data).unwrap();
scene.connect_programs(socket_program, command_program, StreamId::with_message_type::<CommandProgramSocketMessage>()).unwrap();
scene.add_subprogram(SubProgramId::new(), move |_input: InputStream<()>, context| async move {
let (our_side, their_side) = duplex(1024);
let (command_input, command_output) = split(their_side);
let (read_result, write_command) = split(our_side);
let mut socket_program = context.send(socket_program).unwrap();
socket_program.send(InternalSocketMessage::CreateInternalSocket(Box::new(command_input), Box::new(command_output))).await.ok().unwrap();
let mut write_command = write_command;
println!("> {:?}", test_commands);
write_command.write_all(&test_commands.bytes().collect::<Vec<u8>>()).await.unwrap();
let mut read_result = read_result;
while let Ok(msg) = read_result.read_u8().await {
println!("{:?}", msg as char);
if msg == b'!' {
break;
}
}
write_command.shutdown().await.unwrap();
println!("DONE");
context.send_message(TestSucceeded).await.ok();
}, 0);
TestBuilder::new()
.expect_message(|_: TestSucceeded| Ok(()))
.run_in_scene_with_threads(&scene, test_program, 5);
}
#[test]
fn error_from_command_without_closing_socket() {
run_expected_error_command_without_closing("test\n");
}
#[test]
fn error_from_command_with_string_parameter_without_closing_socket() {
run_expected_error_command_without_closing("test \"string\"\n");
}
#[test]
fn error_from_command_with_number_parameter_without_closing_socket() {
run_expected_error_command_without_closing("test -1234\n");
}
#[test]
fn error_from_command_with_const_parameter_without_closing_socket() {
run_expected_error_command_without_closing("test true\n");
}
#[test]
fn error_from_command_with_array_parameter_without_closing_socket() {
run_expected_error_command_without_closing("test [ 1, 2, 3, 4 ]\n");
}
#[test]
fn error_from_command_with_object_parameter_without_closing_socket() {
run_expected_error_command_without_closing("test { \"test\": 2 }\n");
}
#[test]
fn read_from_background_stream_iteration() {
let test_commands = "test\n".to_string();
let scene = Scene::default();
let test_program = SubProgramId::new();
#[derive(Serialize, Deserialize)]
struct TestSucceeded;
impl SceneMessage for TestSucceeded { }
let command_program = SubProgramId::new();
scene.add_subprogram(command_program, |input, context| command_connection_program(input, context, ()), 0);
let socket_program = SubProgramId::new();
start_internal_socket_program(&scene, socket_program, read_command_data, write_command_data).unwrap();
scene.add_subprogram(SubProgramId::new(),
CommandLauncher::json()
.with_json_command("test", |_param: (), _context| async move {
CommandResponse::BackgroundStream(stream::iter(vec![
serde_json::Value::String("one".to_string()),
serde_json::Value::String("two".to_string())
]).boxed())
})
.to_subprogram(),
0);
scene.connect_programs(socket_program, command_program, StreamId::with_message_type::<CommandProgramSocketMessage>()).unwrap();
scene.add_subprogram(SubProgramId::new(), move |_input: InputStream<()>, context| async move {
let (our_side, their_side) = duplex(1024);
let (command_input, command_output) = split(their_side);
let (read_result, write_command) = split(our_side);
let mut socket_program = context.send(socket_program).unwrap();
socket_program.send(InternalSocketMessage::CreateInternalSocket(Box::new(command_input), Box::new(command_output))).await.ok().unwrap();
let mut write_command = write_command;
println!("> {:?}", test_commands);
write_command.write_all(&test_commands.bytes().collect::<Vec<u8>>()).await.unwrap();
let mut read_result = read_result;
let mut characters = String::new();
while let Ok(msg) = read_result.read_u8().await {
characters.push(msg as char);
println!("{:?}", msg as char);
if characters.contains("<<< 0") && characters.contains("<0 \"one\"") && characters.contains("<0 \"two\"") && characters.contains("=== 0") {
break;
}
}
write_command.shutdown().await.unwrap();
println!("DONE");
context.send_message(TestSucceeded).await.ok();
}, 0);
TestBuilder::new()
.expect_message(|_: TestSucceeded| Ok(()))
.run_in_scene_with_threads(&scene, test_program, 5);
}
#[test]
fn send_raw_data_to_command_connection() {
let test_commands = "test\n".to_string();
let scene = Scene::default();
let test_program = SubProgramId::new();
#[derive(Serialize, Deserialize)]
struct TestSucceeded;
impl SceneMessage for TestSucceeded { }
let command_program = SubProgramId::new();
scene.add_subprogram(command_program, |input, context| command_connection_program(input, context, ()), 0);
let socket_program = SubProgramId::new();
start_internal_socket_program(&scene, socket_program, read_command_data, write_command_data).unwrap();
scene.add_subprogram(SubProgramId::new(),
CommandLauncher::json()
.with_json_command("test", |_param: (), _context| async move {
CommandResponse::InteractiveStream(Box::new(
|_| stream::iter(vec!["\n\nRAW OUTPUT\n\n".bytes().collect::<Vec<_>>()]).boxed())
)
})
.to_subprogram(),
0);
scene.connect_programs(socket_program, command_program, StreamId::with_message_type::<CommandProgramSocketMessage>()).unwrap();
scene.add_subprogram(SubProgramId::new(), move |_input: InputStream<()>, context| async move {
let (our_side, their_side) = duplex(1024);
let (command_input, command_output) = split(their_side);
let (read_result, write_command) = split(our_side);
let mut socket_program = context.send(socket_program).unwrap();
socket_program.send(InternalSocketMessage::CreateInternalSocket(Box::new(command_input), Box::new(command_output))).await.ok().unwrap();
let mut write_command = write_command;
println!("> {:?}", test_commands);
write_command.write_all(&test_commands.bytes().collect::<Vec<u8>>()).await.unwrap();
let mut read_result = read_result;
let mut characters = String::new();
while let Ok(msg) = read_result.read_u8().await {
characters.push(msg as char);
println!("{:?}", msg as char);
if characters.contains("\nRAW OUTPUT\n") && characters.contains("\n<< RAW <<\n") && characters.contains("\n== RAW ==\n") {
break;
}
}
write_command.shutdown().await.unwrap();
println!("DONE");
context.send_message(TestSucceeded).await.ok();
}, 0);
TestBuilder::new()
.expect_message(|_: TestSucceeded| Ok(()))
.run_in_scene_with_threads(&scene, test_program, 5);
}
#[test]
fn read_from_background_stream() {
read_from_background_stream_iteration();
}
#[test]
fn read_from_background_stream_repeat() {
for _ in 0..100 {
read_from_background_stream_iteration();
}
}