colink 0.3.10

CoLink Rust SDK
Documentation
use crate::{utils::get_colink_home, CoLink};
use rand::Rng;
use std::{
    fs::File,
    io::Write,
    path::Path,
    process::{Child, Command, Stdio},
};

pub struct InstantServer {
    id: String,
    port: i32,
    host_token: String,
    process: Child,
}

impl Drop for InstantServer {
    fn drop(&mut self) {
        Command::new("pkill")
            .arg("-9")
            .arg("-P")
            .arg(&self.process.id().to_string())
            .stdout(Stdio::null())
            .stderr(Stdio::null())
            .status()
            .unwrap();
        self.process.kill().unwrap();
        let colink_home = get_colink_home().unwrap();
        let working_dir = Path::new(&colink_home)
            .join("instant_servers")
            .join(self.id.clone());
        std::fs::remove_dir_all(working_dir).unwrap();
    }
}

impl Default for InstantServer {
    fn default() -> Self {
        Self::new()
    }
}

impl InstantServer {
    pub fn new() -> Self {
        InstantServer::new_with_config(
            r#"
            [policy_module]
            operator_num = 1
            [[policy_module.create_entry]]
            key_name = "_policy_module:init:accept_all_tasks"
            value = "true"
            
            [remote_storage]
            operator_num = 1
            
            [registry]
            operator_num = 1
            "#,
        )
    }

    pub fn new_with_config(user_init_config: &str) -> Self {
        let colink_home = get_colink_home().unwrap();
        let program = Path::new(&colink_home).join("colink-server");
        if std::fs::metadata(program.clone()).is_err() {
            Command::new("bash")
                .arg("-c")
                .arg("bash -c \"$(curl -fsSL https://raw.githubusercontent.com/CoLearn-Dev/colinkctl/main/install_colink.sh)\"")
                .env("COLINK_INSTALL_SERVER_ONLY", "true")
                .env("COLINK_INSTALL_SILENT", "true")
                .env("COLINK_SERVER_VERSION", "v0.3.5")
                .status()
                .unwrap();
        }
        let instant_server_id = uuid::Uuid::new_v4().to_string();
        let mut port = rand::thread_rng().gen_range(10000..20000);
        while std::net::TcpStream::connect(format!("127.0.0.1:{}", port)).is_ok() {
            port = rand::thread_rng().gen_range(10000..20000);
        }
        let working_dir = Path::new(&colink_home)
            .join("instant_servers")
            .join(instant_server_id.clone());
        std::fs::create_dir_all(&working_dir).unwrap();
        let mut user_init_config_file =
            std::fs::File::create(Path::new(&working_dir).join("user_init_config.toml")).unwrap();
        user_init_config_file
            .write_all(user_init_config.as_bytes())
            .unwrap();
        let mq_uri = if std::env::var("COLINK_SERVER_MQ_URI").is_ok() {
            Some(std::env::var("COLINK_SERVER_MQ_URI").unwrap())
        } else {
            None
        };
        let mq_api = if std::env::var("COLINK_SERVER_MQ_API").is_ok() {
            Some(std::env::var("COLINK_SERVER_MQ_API").unwrap())
        } else {
            None
        };
        let (mq_uri, mq_api) = std::thread::spawn(move || {
            tokio::runtime::Builder::new_multi_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(async {
                    if mq_uri.is_some() {
                        let mq_uri = mq_uri.clone().unwrap();
                        if mq_uri.starts_with("amqp") {
                            lapin::Connection::connect(
                                &mq_uri,
                                lapin::ConnectionProperties::default(),
                            )
                            .await
                            .unwrap();
                            if mq_api.is_some() {
                                let res = reqwest::get(&mq_api.clone().unwrap()).await.unwrap();
                                assert!(res.status() == hyper::StatusCode::OK);
                            }
                        } else if mq_uri.starts_with("redis") {
                            let client = redis::Client::open(mq_uri).unwrap();
                            let _con = client.get_async_connection().await.unwrap();
                        } else {
                            panic!("mq_uri({}) is not supported.", mq_uri);
                        }
                    }
                });
            (mq_uri, mq_api)
        })
        .join()
        .unwrap();
        let mut args = vec![
            "--address".to_string(),
            "0.0.0.0".to_string(),
            "--port".to_string(),
            port.to_string(),
            "--mq-prefix".to_string(),
            format!("colink-instant-server-{}", port),
            "--core-uri".to_string(),
            format!("http://127.0.0.1:{}", port),
            "--inter-core-reverse-mode".to_string(),
        ];
        if let Some(mq_uri) = mq_uri {
            args.push("--mq-uri".to_string());
            args.push(mq_uri);
        }
        if let Some(mq_api) = mq_api {
            args.push("--mq-api".to_string());
            args.push(mq_api);
        }
        let child = Command::new(program)
            .args(&args)
            .env("COLINK_HOME", colink_home)
            .current_dir(working_dir.clone())
            .spawn()
            .unwrap();
        loop {
            if std::fs::metadata(working_dir.join("host_token.txt")).is_ok()
                && std::net::TcpStream::connect(format!("127.0.0.1:{}", port)).is_ok()
            {
                break;
            }
            std::thread::sleep(core::time::Duration::from_millis(10));
        }
        let host_token: String =
            String::from_utf8_lossy(&std::fs::read(working_dir.join("host_token.txt")).unwrap())
                .parse()
                .unwrap();
        Self {
            id: instant_server_id,
            port,
            host_token,
            process: child,
        }
    }

    pub fn get_colink(&self) -> CoLink {
        CoLink::new(&format!("http://127.0.0.1:{}", self.port), &self.host_token)
    }
}

pub struct InstantRegistry {
    _instant_server: InstantServer,
}

impl Drop for InstantRegistry {
    fn drop(&mut self) {
        let colink_home = get_colink_home().unwrap();
        let registry_file = Path::new(&colink_home).join("reg_config");
        std::fs::remove_file(registry_file).unwrap();
    }
}

impl Default for InstantRegistry {
    fn default() -> Self {
        Self::new()
    }
}

impl InstantRegistry {
    pub fn new() -> Self {
        let is = InstantServer::new();
        let colink_home = get_colink_home().unwrap();
        let registry_file = Path::new(&colink_home).join("reg_config");
        let _file = File::options()
            .write(true)
            .create_new(true)
            .open(registry_file)
            .unwrap();
        let is = std::thread::spawn(move || {
            tokio::runtime::Builder::new_multi_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(async {
                    is.get_colink().switch_to_generated_user().await.unwrap();
                });
            is
        })
        .join()
        .unwrap();
        Self {
            _instant_server: is,
        }
    }
}