use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tracing::{debug, warn};
use crate::block::router::WorkerRouter;
use crate::client::{
create_master_inquire_client, MasterClient, MasterInquireClient, WorkerClientPool,
WorkerManagerClient,
};
use crate::config::{ConfigRefresher, GooseFsConfig, TransparentAccelerationSwitch};
use crate::error::{Error, Result};
const REFRESH_CHECK_INTERVAL: Duration = Duration::from_secs(30);
const CONFIG_REFRESH_INTERVAL: Duration = Duration::from_secs(60);
pub struct FileSystemContext {
config: Arc<GooseFsConfig>,
master: Arc<MasterClient>,
worker_manager: Arc<WorkerManagerClient>,
worker_pool: Arc<WorkerClientPool>,
worker_router: Arc<WorkerRouter>,
inquire_client: Arc<dyn MasterInquireClient>,
config_refresher: Arc<ConfigRefresher>,
closed: Arc<AtomicBool>,
worker_refresh_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
config_refresh_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
}
impl FileSystemContext {
pub async fn connect(config: GooseFsConfig) -> Result<Arc<Self>> {
let config = Arc::new(config);
let inquire_client = create_master_inquire_client(&config);
let (master_res, wm_res) = tokio::join!(
MasterClient::connect_with_inquire(&config, inquire_client.clone()),
WorkerManagerClient::connect_with_inquire(&config, inquire_client.clone()),
);
let master = Arc::new(master_res?);
let worker_manager = Arc::new(wm_res?);
let workers = worker_manager.get_worker_info_list().await?;
if workers.is_empty() {
return Err(Error::NoWorkerAvailable {
message: "no workers available at startup".to_string(),
});
}
debug!(count = workers.len(), "initial worker list fetched");
let worker_router = Arc::new(WorkerRouter::with_ttls(
Duration::from_secs(60), Duration::from_secs(30), ));
worker_router.update_workers(workers).await;
let worker_pool = WorkerClientPool::new_shared((*config).clone());
let ctx = Arc::new(Self {
config: config.clone(),
master,
worker_manager,
worker_pool,
worker_router,
inquire_client,
config_refresher: Arc::new(ConfigRefresher::from_config(&config)),
closed: Arc::new(AtomicBool::new(false)),
worker_refresh_task: Mutex::new(None),
config_refresh_task: Mutex::new(None),
});
ctx.clone().start_worker_refresh_task().await;
ctx.clone().start_config_refresh_task().await;
Ok(ctx)
}
pub fn acquire_master(&self) -> Arc<MasterClient> {
self.master.clone()
}
pub fn acquire_worker_manager(&self) -> Arc<WorkerManagerClient> {
self.worker_manager.clone()
}
pub fn acquire_worker_pool(&self) -> Arc<WorkerClientPool> {
self.worker_pool.clone()
}
pub fn acquire_router(&self) -> Arc<WorkerRouter> {
self.worker_router.clone()
}
pub fn acquire_inquire_client(&self) -> Arc<dyn MasterInquireClient> {
self.inquire_client.clone()
}
pub fn config(&self) -> &GooseFsConfig {
&self.config
}
pub fn acquire_config_refresher(&self) -> Arc<ConfigRefresher> {
self.config_refresher.clone()
}
pub fn refresh_transparent_acceleration_switch(&self) -> TransparentAccelerationSwitch {
self.config_refresher
.refresh_transparent_acceleration_switch()
}
pub async fn close(&self) -> Result<()> {
if self.closed.swap(true, Ordering::SeqCst) {
return Ok(()); }
let worker_handle = self.worker_refresh_task.lock().await.take();
if let Some(h) = worker_handle {
h.abort();
debug!("worker refresh task aborted");
}
let config_handle = self.config_refresh_task.lock().await.take();
if let Some(h) = config_handle {
h.abort();
debug!("config refresh task aborted");
}
Ok(())
}
pub fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}
async fn start_worker_refresh_task(self: Arc<Self>) {
let worker_router = self.worker_router.clone();
let worker_manager = self.worker_manager.clone();
let closed = self.closed.clone();
let handle = tokio::spawn(async move {
loop {
tokio::time::sleep(REFRESH_CHECK_INTERVAL).await;
if closed.load(Ordering::SeqCst) {
debug!("worker refresh task: context closed, exiting");
break;
}
if worker_router.needs_refresh().await {
if let Err(e) = worker_router.refresh_workers(&worker_manager).await {
warn!("worker refresh failed: {}", e);
} else {
debug!("worker list refreshed by background task");
}
}
}
});
*self.worker_refresh_task.lock().await = Some(handle);
}
async fn start_config_refresh_task(self: Arc<Self>) {
let config_refresher = self.config_refresher.clone();
let closed = self.closed.clone();
let handle = tokio::spawn(async move {
let switch = config_refresher.refresh_transparent_acceleration_switch();
debug!(
transparent_acceleration_enabled = switch.enabled,
cosranger_enabled = switch.cosranger_enabled,
"config refresh: initial load completed"
);
loop {
tokio::time::sleep(CONFIG_REFRESH_INTERVAL).await;
if closed.load(Ordering::SeqCst) {
debug!("config refresh task: context closed, exiting");
break;
}
let switch = config_refresher.refresh_transparent_acceleration_switch();
debug!(
transparent_acceleration_enabled = switch.enabled,
cosranger_enabled = switch.cosranger_enabled,
"config refresh check completed"
);
}
});
*self.config_refresh_task.lock().await = Some(handle);
}
}
impl Drop for FileSystemContext {
fn drop(&mut self) {
self.closed.store(true, Ordering::SeqCst);
if let Ok(mut guard) = self.worker_refresh_task.try_lock() {
if let Some(h) = guard.take() {
h.abort();
}
}
if let Ok(mut guard) = self.config_refresh_task.try_lock() {
if let Some(h) = guard.take() {
h.abort();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_context_closed_starts_false() {
let closed = Arc::new(AtomicBool::new(false));
assert!(!closed.load(Ordering::SeqCst));
}
#[test]
fn test_context_close_is_idempotent() {
let closed = Arc::new(AtomicBool::new(false));
let was_open = !closed.swap(true, Ordering::SeqCst);
assert!(was_open);
let was_open2 = !closed.swap(true, Ordering::SeqCst);
assert!(!was_open2);
}
#[test]
fn test_refresh_check_interval() {
assert_eq!(REFRESH_CHECK_INTERVAL, Duration::from_secs(30));
}
#[test]
fn test_config_refresh_interval() {
assert_eq!(CONFIG_REFRESH_INTERVAL, Duration::from_secs(60));
}
#[test]
fn test_worker_router_ttls_accepted() {
let router = WorkerRouter::with_ttls(Duration::from_secs(60), Duration::from_secs(30));
drop(router);
}
}