colink 0.3.10

CoLink Rust SDK
Documentation
use rand::Rng;
use std::{
    net::TcpStream,
    process::{Child, Command, Stdio},
};
use tracing::debug;

const CORE_ADDR: &str = "127.0.0.1";
const MQ_PREFIX: &str = "colink-test";
const USER_NUM: [usize; 11] = [2, 2, 2, 2, 2, 3, 3, 4, 4, 5, 5];

struct KilledWhenDrop(Child);

impl Drop for KilledWhenDrop {
    fn drop(&mut self) {
        Command::new("pkill")
            .arg("-9")
            .arg("-P")
            .arg(&self.0.id().to_string())
            .stdout(Stdio::null())
            .stderr(Stdio::null())
            .status()
            .unwrap();
        self.0.kill().unwrap()
    }
}

#[test]
fn test_main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt::init();
    build();
    for i in 0..11 {
        test_greetings(12300 + i, USER_NUM[i as usize])?;
    }
    Ok(())
}

fn test_greetings(port: u16, user_num: usize) -> Result<(), Box<dyn std::error::Error>> {
    let addr = format!("http://{}:{}", CORE_ADDR, port);
    let mut child_processes = vec![];

    assert!(
        TcpStream::connect(&format!("{}:{}", CORE_ADDR, port)).is_err(),
        "listen {}:{}: address already in use.",
        CORE_ADDR,
        port
    );
    if std::fs::metadata("./tests/colink-server/host_token.txt").is_ok() {
        std::fs::remove_file("./tests/colink-server/host_token.txt")?;
    }
    child_processes.push(KilledWhenDrop(start_core(port)));
    loop {
        if std::fs::metadata("./tests/colink-server/host_token.txt").is_ok()
            && TcpStream::connect(&format!("{}:{}", CORE_ADDR, port)).is_ok()
        {
            break;
        }
        std::thread::sleep(core::time::Duration::from_millis(100));
    }

    let host_token: String =
        String::from_utf8_lossy(&std::fs::read("./tests/colink-server/host_token.txt")?).parse()?;
    let users = host_import_users_and_exchange_guest_jwts(&addr, &host_token, user_num);
    debug!("users:{:?}", users);
    assert!(users.len() == user_num);
    let start_time = chrono::Utc::now().timestamp_nanos();
    let random_number = rand::thread_rng().gen_range(0..1000);

    let mut threads = vec![];
    if user_num == 2 {
        threads.push({
            let users = users.clone();
            let time: u64 = rand::thread_rng().gen_range(0..1000);
            let msg = random_number.to_string();
            let addr = addr.clone();
            std::thread::spawn(move || {
                std::thread::sleep(core::time::Duration::from_millis(time));
                user_run_task(&addr, &users[0], &users[1], &msg)
            })
        });
    } else {
        threads.push({
            let users = users.clone();
            let time: u64 = rand::thread_rng().gen_range(0..1000);
            let addr = addr.clone();
            std::thread::spawn(move || {
                std::thread::sleep(core::time::Duration::from_millis(time));
                user_greetings_to_multiple_users(&addr, &users)
            })
        });
    }
    for i in 1..users.len() {
        threads.push({
            let users = users.clone();
            let time: u64 = rand::thread_rng().gen_range(0..1000);
            let addr = addr.clone();
            std::thread::spawn(move || {
                std::thread::sleep(core::time::Duration::from_millis(time));
                run_auto_confirm(&addr, &users[i], "greetings")
            })
        });
    }
    for user in &users {
        let num: usize = rand::thread_rng().gen_range(1..4); // Generate the number of operators for testing multiple protocol operators.
        for _ in 0..num {
            threads.push({
                let user = user.clone();
                let time: u64 = rand::thread_rng().gen_range(0..1000);
                let addr = addr.clone();
                std::thread::spawn(move || {
                    std::thread::sleep(core::time::Duration::from_millis(time));
                    run_protocol_greetings(&addr, &user)
                })
            });
        }
    }
    for join in threads {
        child_processes.push(KilledWhenDrop(join.join().unwrap()));
    }

    let rnd_receiver = rand::thread_rng().gen_range(1..user_num);
    let msg = get_next_greeting_message(&addr, &users[rnd_receiver], start_time);
    debug!("msg:{:?}", msg);
    if user_num == 2 {
        assert!(msg.parse::<i32>()? == random_number);
    } else {
        assert!(msg == "hello");
    }
    Ok(())
}

fn start_core(port: u16) -> Child {
    Command::new("./colink-server")
        .args([
            "--address",
            CORE_ADDR,
            "--port",
            &port.to_string(),
            "--mq-prefix",
            MQ_PREFIX,
        ])
        .env(
            "COLINK_HOME",
            std::env::current_dir()
                .unwrap()
                .join("tests")
                .join("colink-server")
                .to_str()
                .unwrap(),
        )
        .current_dir("./tests/colink-server")
        .stdout(Stdio::null())
        .stderr(Stdio::null())
        .spawn()
        .unwrap()
}

fn build() {
    let mut sdk = Command::new("cargo")
        .args(["build", "--all-targets"])
        .current_dir("./")
        .stdout(Stdio::null())
        .stderr(Stdio::null())
        .spawn()
        .unwrap();
    sdk.wait().unwrap();
}

fn host_import_users_and_exchange_guest_jwts(addr: &str, jwt: &str, num: usize) -> Vec<String> {
    let res = Command::new("cargo")
        .args([
            "run",
            "--example",
            "host_import_users_and_exchange_guest_jwts",
            addr,
            jwt,
            &num.to_string(),
        ])
        .current_dir("./")
        .output()
        .unwrap();
    debug!(
        "res:{:?}, {:?}, {}",
        res.stdout,
        String::from_utf8_lossy(&res.stderr).to_string(),
        res.status
    );
    let res = String::from_utf8_lossy(&res.stdout).to_string();
    debug!("res:{:?}", res);
    let users = res.split_whitespace().map(|s| s.to_string()).collect();
    users
}

fn user_run_task(addr: &str, jwt_a: &str, jwt_b: &str, msg: &str) -> Child {
    Command::new("cargo")
        .args(["run", "--example", "user_run_task", addr, jwt_a, jwt_b, msg])
        .current_dir("./")
        .stdout(Stdio::null())
        .stderr(Stdio::null())
        .spawn()
        .unwrap()
}

fn user_greetings_to_multiple_users(addr: &str, jwts: &[String]) -> Child {
    let mut args = vec!["run", "--example", "user_greetings_to_multiple_users", addr];
    for jwt in jwts {
        args.push(jwt);
    }
    Command::new("cargo")
        .args(&args)
        .current_dir("./")
        .stdout(Stdio::null())
        .stderr(Stdio::null())
        .spawn()
        .unwrap()
}

fn run_auto_confirm(addr: &str, jwt: &str, protocol_name: &str) -> Child {
    Command::new("cargo")
        .args(["run", "--example", "auto_confirm", addr, jwt, protocol_name])
        .current_dir("./")
        .stdout(Stdio::null())
        .stderr(Stdio::null())
        .spawn()
        .unwrap()
}

fn run_protocol_greetings(addr: &str, jwt: &str) -> Child {
    Command::new("cargo")
        .args([
            "run",
            "--example",
            "protocol_greetings",
            "--",
            "--addr",
            addr,
            "--jwt",
            jwt,
        ])
        .current_dir("./")
        .stdout(Stdio::null())
        .stderr(Stdio::null())
        .spawn()
        .unwrap()
}

fn get_next_greeting_message(addr: &str, jwt: &str, now: i64) -> String {
    let res = Command::new("cargo")
        .args([
            "run",
            "--example",
            "get_next_greeting_message",
            addr,
            jwt,
            &now.to_string(),
        ])
        .current_dir("./")
        .output()
        .unwrap();
    let mut msg = String::from_utf8_lossy(&res.stdout).to_string();
    msg.pop();
    msg
}