mod client;
mod config;
mod error;
mod protocol;
mod runtime;
mod storage;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use tracing::info;
#[allow(unused_imports)] use tracing::warn;
use crate::client::{KeyComputeClient, OllamaClient};
use crate::runtime::{heartbeat_loop, poll_loop, register_node, try_load_session, TaskExecutor};
use crate::storage::LocalStorage;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("node_token=info".parse()?),
)
.init();
info!("node-token starting");
let config = config::load_config()?;
info!("Configuration loaded successfully");
info!("Server URL: {}", config.server_url);
info!("Client instance ID: {}", config.client_instance_id);
info!("Display name: {}", config.display_name);
info!("Ollama URL: {}", config.ollama_url);
let client = Arc::new(KeyComputeClient::new(&config.server_url));
let ollama_client = Arc::new(OllamaClient::new(&config.ollama_url));
let storage = LocalStorage::new(config.data_dir.as_deref())?;
let session = match try_load_session(&storage)? {
Some(s) => {
info!("Loaded existing session, skipping registration");
s
}
None => {
info!("Registering new node");
register_node(&client, &ollama_client, &config, &storage).await?;
match try_load_session(&storage)? {
Some(s) => s,
None => {
return Err(anyhow::anyhow!(
"Failed to load session after successful registration"
));
}
}
}
};
client
.set_session_token(session.session_token.clone())
.await;
let is_excluded = Arc::new(AtomicBool::new(false));
let stop_signal = Arc::new(AtomicBool::new(false));
let heartbeat_client = client.clone();
let heartbeat_ollama = ollama_client.clone();
let heartbeat_session = session.clone();
let heartbeat_config = config.clone();
let heartbeat_excluded = is_excluded.clone();
let heartbeat_stop = stop_signal.clone();
let heartbeat_handle = tokio::spawn(async move {
heartbeat_loop(
&heartbeat_client,
&heartbeat_ollama,
&heartbeat_session,
&heartbeat_config,
heartbeat_excluded,
heartbeat_stop,
)
.await;
});
tokio::time::sleep(Duration::from_secs(2)).await;
let executor = Arc::new(TaskExecutor::new(
client.clone(),
ollama_client.clone(),
session.clone(),
));
let poll_client = client.clone();
let poll_session = session.clone();
let poll_executor = executor;
let poll_excluded = is_excluded.clone();
let poll_stop = stop_signal.clone();
let poll_excluded_check_interval =
Duration::from_secs(config.excluded_poll_check_interval_secs);
let poll_timeout_secs = session.poll_timeout_secs;
let poll_handle = tokio::spawn(async move {
poll_loop(
&poll_client,
&poll_session,
poll_executor,
poll_excluded,
poll_stop,
poll_excluded_check_interval,
poll_timeout_secs,
)
.await;
});
wait_for_signal().await;
info!("Received shutdown signal, stopping...");
stop_signal.store(true, Ordering::Relaxed);
let _ = tokio::join!(heartbeat_handle, poll_handle);
info!("Node token stopped");
Ok(())
}
async fn wait_for_signal() {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm =
signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
let mut sigint = signal(SignalKind::interrupt()).expect("failed to install SIGINT handler");
tokio::select! {
_ = sigterm.recv() => {
info!("Received SIGTERM");
}
_ = sigint.recv() => {
info!("Received SIGINT");
}
}
}
#[cfg(windows)]
{
use tokio::signal::windows;
let mut ctrl_c = windows::ctrl_c().expect("failed to install CTRL+C handler");
ctrl_c.recv().await;
info!("Received CTRL+C");
}
#[cfg(not(any(unix, windows)))]
{
warn!("No signal handling on this platform, waiting indefinitely");
tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
}
}