use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use futures::future::try_join_all;
use serde_json::json;
use flame_rs as flame;
use flame::{
apis::{FlameError, SessionState, Shim, TaskState},
client::{ApplicationAttributes, ApplicationSchema, SessionAttributes, Task, TaskInformer},
lock_ptr, new_ptr,
};
const FLAME_DEFAULT_ADDR: &str = "http://127.0.0.1:8080";
const FLAME_DEFAULT_APP: &str = "flmping";
pub struct DefaultTaskInformer {
pub succeed: i32,
pub failed: i32,
pub error: i32,
}
impl TaskInformer for DefaultTaskInformer {
fn on_update(&mut self, task: Task) {
tracing::info!("task: {:?}", task.state);
match task.state {
TaskState::Succeed => self.succeed += 1,
TaskState::Failed => self.failed += 1,
_ => {}
}
}
fn on_error(&mut self, _: FlameError) {
self.error += 1;
tracing::info!("error: {}", self.error);
}
}
#[tokio::test]
async fn test_create_session() -> Result<(), FlameError> {
let conn = flame::client::connect(FLAME_DEFAULT_ADDR).await?;
let ssn_attr = SessionAttributes {
application: FLAME_DEFAULT_APP.to_string(),
slots: 1,
common_data: None,
};
let ssn = conn.create_session(&ssn_attr).await?;
assert_eq!(ssn.state, SessionState::Open);
ssn.close().await?;
Ok(())
}
#[tokio::test]
async fn test_create_multiple_sessions() -> Result<(), FlameError> {
let conn = flame::client::connect(FLAME_DEFAULT_ADDR).await?;
let ssn_num = 10;
for _ in 0..ssn_num {
let ssn_attr = SessionAttributes {
application: FLAME_DEFAULT_APP.to_string(),
slots: 1,
common_data: None,
};
let ssn = conn.create_session(&ssn_attr).await?;
assert_eq!(ssn.state, SessionState::Open);
ssn.close().await?;
}
Ok(())
}
#[tokio::test]
async fn test_create_session_with_tasks() -> Result<(), FlameError> {
let conn = flame::client::connect(FLAME_DEFAULT_ADDR).await?;
let ssn_attr = SessionAttributes {
application: FLAME_DEFAULT_APP.to_string(),
slots: 1,
common_data: None,
};
let ssn = conn.create_session(&ssn_attr).await?;
assert_eq!(ssn.state, SessionState::Open);
let informer = new_ptr!(DefaultTaskInformer {
succeed: 0,
failed: 0,
error: 0,
});
let task_num = 100;
let mut tasks = vec![];
for _ in 0..task_num {
let task = ssn.run_task(None, informer.clone());
tasks.push(task);
}
try_join_all(tasks).await?;
{
let informer = lock_ptr!(informer)?;
assert_eq!(informer.succeed, task_num);
}
ssn.close().await?;
Ok(())
}
#[tokio::test]
async fn test_create_multiple_sessions_with_tasks() -> Result<(), FlameError> {
let conn = flame::client::connect(FLAME_DEFAULT_ADDR).await?;
let ssn_attr = SessionAttributes {
application: FLAME_DEFAULT_APP.to_string(),
slots: 1,
common_data: None,
};
let ssn_1 = conn.create_session(&ssn_attr).await?;
assert_eq!(ssn_1.state, SessionState::Open);
let ssn_2 = conn.create_session(&ssn_attr).await?;
assert_eq!(ssn_2.state, SessionState::Open);
let informer = new_ptr!(DefaultTaskInformer {
succeed: 0,
failed: 0,
error: 0,
});
let task_num = 100;
let mut tasks = vec![];
for _ in 0..task_num {
let task = ssn_1.run_task(None, informer.clone());
tasks.push(task);
}
for _ in 0..task_num {
let task = ssn_2.run_task(None, informer.clone());
tasks.push(task);
}
try_join_all(tasks).await?;
{
let informer = lock_ptr!(informer)?;
assert_eq!(informer.succeed, task_num * 2);
}
ssn_1.close().await?;
ssn_2.close().await?;
Ok(())
}
#[tokio::test]
async fn test_application_lifecycle() -> Result<(), FlameError> {
let conn = flame::client::connect(FLAME_DEFAULT_ADDR).await?;
let string_schema = json!({
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "string",
"description": "The string for testing."
});
let apps = vec![
(
"my-test-agent-1".to_string(),
ApplicationAttributes {
shim: Shim::Host,
image: Some("may-agent".to_string()),
description: Some("This is my agent for testing.".to_string()),
labels: vec!["test".to_string(), "agent".to_string()],
command: Some("my-agent".to_string()),
arguments: vec!["--test".to_string(), "--agent".to_string()],
environments: HashMap::from([("TEST".to_string(), "true".to_string())]),
working_directory: Some("/tmp".to_string()),
max_instances: Some(10),
delay_release: None,
schema: Some(ApplicationSchema {
input: Some(string_schema.to_string()),
output: Some(string_schema.to_string()),
common_data: None,
}),
},
),
(
"empty-app".to_string(),
ApplicationAttributes {
shim: Shim::Host,
image: None,
description: None,
labels: vec![],
command: None,
arguments: vec![],
environments: HashMap::new(),
working_directory: None,
max_instances: None,
delay_release: None,
schema: None,
},
),
];
for (name, app_attr) in apps {
conn.register_application(name.clone(), app_attr)
.await
.map_err(|e| {
FlameError::Internal(format!("failed to register application <{name}>: {e}"))
})?;
}
Ok(())
}