goosefs_sdk/context.rs
1//! `FileSystemContext` — shared connection pool and routing context.
2//!
3//! This module implements the **three-layer connection architecture** that
4//! eliminates repeated TCP+SASL handshakes:
5//!
6//! ```text
7//! Layer 3: FileSystemContext — lifecycle manager + unified acquisition API
8//! │
9//! ├── Arc<MasterClient> — persistent Master gRPC channel
10//! ├── Arc<WorkerManagerClient> — persistent WorkerMgr gRPC channel
11//! ├── Arc<WorkerClientPool> — shared Worker connection pool
12//! └── Arc<WorkerRouter> — shared consistent-hash router
13//!
14//! Layer 2: WorkerClientPool / WorkerRouter — connection & routing management
15//!
16//! Layer 1: MasterClient / WorkerManagerClient / WorkerClient — gRPC stubs
17//! ```
18//!
19//! # Before vs After
20//!
21//! | Operation | Before (per-call) | After (shared) |
22//! |-----------|-------------------|----------------|
23//! | `BaseFileSystem::get_status()` | 1 TCP+SASL | 0 (reused) |
24//! | `GoosefsFileInStream::open()` | 2 TCP+SASL | 0 (reused) |
25//! | Reading N blocks | N TCP connects | ~N_workers (pooled) |
26//!
27//! # Usage
28//!
29//! ```rust,no_run
30//! use goosefs_sdk::context::FileSystemContext;
31//! use goosefs_sdk::config::GoosefsConfig;
32//! use goosefs_sdk::fs::FileSystem; // needed to call trait methods
33//!
34//! # async fn example() -> goosefs_sdk::error::Result<()> {
35//! // Build once, share across all operations
36//! let ctx = FileSystemContext::connect(GoosefsConfig::new("127.0.0.1:9200")).await?;
37//!
38//! // Pass ctx into filesystem operations
39//! use goosefs_sdk::fs::BaseFileSystem;
40//! let fs = BaseFileSystem::from_context(ctx.clone());
41//! let status = fs.get_status("/data/file.parquet").await?;
42//! # Ok(())
43//! # }
44//! ```
45
46use std::sync::atomic::{AtomicBool, Ordering};
47use std::sync::Arc;
48use std::time::Duration;
49
50use tokio::sync::Mutex;
51use tracing::{debug, warn};
52
53use crate::block::router::WorkerRouter;
54use crate::client::{
55 create_master_inquire_client, MasterClient, MasterInquireClient, WorkerClientPool,
56 WorkerManagerClient,
57};
58use crate::config::{ConfigRefresher, GoosefsConfig, TransparentAccelerationSwitch};
59use crate::error::{Error, Result};
60
61/// How often the background refresh loop checks whether the worker list is stale.
62/// Matches the `DEFAULT_WORKER_REFRESH_TTL` (30s) in `WorkerRouter`.
63const REFRESH_CHECK_INTERVAL: Duration = Duration::from_secs(30);
64
65/// How often the background config refresh loop runs (default 60s).
66///
67/// Mirrors Java's `refreshInterval` (default 60s) in `NamespaceRefreshThread`.
68/// This is intentionally separate from [`REFRESH_CHECK_INTERVAL`] so that
69/// config reloading and worker-list refreshing run on independent cadences.
70const CONFIG_REFRESH_INTERVAL: Duration = Duration::from_secs(60);
71
72/// Shared connection context for Goosefs filesystem operations.
73///
74/// A single `FileSystemContext` instance should be created per Goosefs cluster
75/// and shared across all `BaseFileSystem`, `GoosefsFileInStream`, and
76/// `GoosefsFileWriter` instances that connect to that cluster.
77///
78/// The context owns:
79/// - One persistent gRPC channel to the Master
80/// - One persistent gRPC channel to the WorkerManager service
81/// - One `WorkerClientPool` shared across all readers and writers
82/// - One `WorkerRouter` that tracks live workers and routes block reads
83pub struct FileSystemContext {
84 config: Arc<GoosefsConfig>,
85
86 /// Persistent Master gRPC connection (metadata RPCs).
87 master: Arc<MasterClient>,
88
89 /// Persistent WorkerManager gRPC connection (`GetWorkerInfoList`).
90 worker_manager: Arc<WorkerManagerClient>,
91
92 /// Worker gRPC connection pool — shared across all readers and writers.
93 worker_pool: Arc<WorkerClientPool>,
94
95 /// Consistent-hash router with TTL refresh and local-first preference.
96 worker_router: Arc<WorkerRouter>,
97
98 /// HA Master address discovery client (shared between master + wm).
99 inquire_client: Arc<dyn MasterInquireClient>,
100
101 /// Periodic config refresher — reloads `goosefs-site.properties` when
102 /// expired and updates the transparent acceleration switch flags.
103 ///
104 /// Mirrors Java's `ConfigurationUtils.loadIfExpire()` +
105 /// `AbstractCompatibleFileSystem.refreshTransparentAccelerationSwitch()`.
106 config_refresher: Arc<ConfigRefresher>,
107
108 /// Set to `true` once `close()` has been called.
109 closed: Arc<AtomicBool>,
110
111 /// Handle to the background worker-list TTL-refresh task.
112 /// Aborted on `close()`.
113 worker_refresh_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
114
115 /// Handle to the background config refresh task.
116 /// Aborted on `close()`.
117 config_refresh_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
118}
119
120impl FileSystemContext {
121 // ── Construction ────────────────────────────────────────────────────────
122
123 /// Build a `FileSystemContext` by connecting to the Goosefs cluster.
124 ///
125 /// Establishes persistent connections to the Master and WorkerManager,
126 /// fetches the initial worker list, and starts a background refresh task.
127 ///
128 /// This is the **only** call that performs network I/O. All subsequent
129 /// operations on the context are zero-cost Arc clones.
130 pub async fn connect(config: GoosefsConfig) -> Result<Arc<Self>> {
131 let config = Arc::new(config);
132
133 // Build a shared inquire client so Master + WorkerManager both use the
134 // same singleflight-deduped HA discovery.
135 let inquire_client = create_master_inquire_client(&config);
136
137 // Connect Master and WorkerManager in parallel.
138 let (master_res, wm_res) = tokio::join!(
139 MasterClient::connect_with_inquire(&config, inquire_client.clone()),
140 WorkerManagerClient::connect_with_inquire(&config, inquire_client.clone()),
141 );
142 let master = Arc::new(master_res?);
143 let worker_manager = Arc::new(wm_res?);
144
145 // Fetch the initial worker list.
146 let workers = worker_manager.get_worker_info_list().await?;
147 if workers.is_empty() {
148 return Err(Error::NoWorkerAvailable {
149 message: "no workers available at startup".to_string(),
150 });
151 }
152 debug!(count = workers.len(), "initial worker list fetched");
153
154 // Build the router with failure/refresh TTLs.
155 let worker_router = Arc::new(WorkerRouter::with_ttls(
156 Duration::from_secs(60), // failure_ttl
157 Duration::from_secs(30), // worker_refresh_ttl (matches Go SDK)
158 ));
159 worker_router.update_workers(workers).await;
160
161 // Build the shared worker connection pool.
162 let worker_pool = WorkerClientPool::new_shared((*config).clone());
163
164 let ctx = Arc::new(Self {
165 config: config.clone(),
166 master,
167 worker_manager,
168 worker_pool,
169 worker_router,
170 inquire_client,
171 config_refresher: Arc::new(ConfigRefresher::from_config(&config)),
172 closed: Arc::new(AtomicBool::new(false)),
173 worker_refresh_task: Mutex::new(None),
174 config_refresh_task: Mutex::new(None),
175 });
176
177 // Start the background worker-list refresh loop.
178 ctx.clone().start_worker_refresh_task().await;
179 // Start the background config refresh loop (separate cadence).
180 ctx.clone().start_config_refresh_task().await;
181
182 Ok(ctx)
183 }
184
185 // ── Acquisition API ──────────────────────────────────────────────────────
186
187 /// Return the shared `MasterClient` (zero-cost Arc clone).
188 pub fn acquire_master(&self) -> Arc<MasterClient> {
189 self.master.clone()
190 }
191
192 /// Return the shared `WorkerManagerClient` (zero-cost Arc clone).
193 pub fn acquire_worker_manager(&self) -> Arc<WorkerManagerClient> {
194 self.worker_manager.clone()
195 }
196
197 /// Return the shared `WorkerClientPool` (zero-cost Arc clone).
198 pub fn acquire_worker_pool(&self) -> Arc<WorkerClientPool> {
199 self.worker_pool.clone()
200 }
201
202 /// Return the shared `WorkerRouter` (zero-cost Arc clone).
203 pub fn acquire_router(&self) -> Arc<WorkerRouter> {
204 self.worker_router.clone()
205 }
206
207 /// Return the shared `MasterInquireClient` (zero-cost Arc clone).
208 pub fn acquire_inquire_client(&self) -> Arc<dyn MasterInquireClient> {
209 self.inquire_client.clone()
210 }
211
212 /// Return the configuration used to build this context.
213 pub fn config(&self) -> &GoosefsConfig {
214 &self.config
215 }
216
217 /// Return the shared `ConfigRefresher` (zero-cost Arc clone).
218 ///
219 /// Use this to query the current transparent acceleration switch values
220 /// or to trigger a config reload check.
221 pub fn acquire_config_refresher(&self) -> Arc<ConfigRefresher> {
222 self.config_refresher.clone()
223 }
224
225 /// Refresh the transparent acceleration switch by reloading config if expired.
226 ///
227 /// Convenience wrapper around `ConfigRefresher::refresh_transparent_acceleration_switch()`.
228 /// Mirrors Java's `AbstractCompatibleFileSystem.refreshTransparentAccelerationSwitch()`.
229 pub fn refresh_transparent_acceleration_switch(&self) -> TransparentAccelerationSwitch {
230 self.config_refresher
231 .refresh_transparent_acceleration_switch()
232 }
233
234 // ── Lifecycle ────────────────────────────────────────────────────────────
235
236 /// Gracefully shut down the context.
237 ///
238 /// Aborts the background refresh task and marks the context as closed.
239 /// Idempotent — safe to call multiple times.
240 pub async fn close(&self) -> Result<()> {
241 if self.closed.swap(true, Ordering::SeqCst) {
242 return Ok(()); // Already closed.
243 }
244
245 // Cancel the background refresh tasks.
246 let worker_handle = self.worker_refresh_task.lock().await.take();
247 if let Some(h) = worker_handle {
248 h.abort();
249 debug!("worker refresh task aborted");
250 }
251 let config_handle = self.config_refresh_task.lock().await.take();
252 if let Some(h) = config_handle {
253 h.abort();
254 debug!("config refresh task aborted");
255 }
256
257 Ok(())
258 }
259
260 /// Return `true` if `close()` has been called.
261 pub fn is_closed(&self) -> bool {
262 self.closed.load(Ordering::SeqCst)
263 }
264
265 // ── Background refresh ────────────────────────────────────────────────────
266
267 /// Start the background worker-list TTL-refresh loop.
268 ///
269 /// The loop wakes every [`REFRESH_CHECK_INTERVAL`] seconds, calls
270 /// [`WorkerRouter::needs_refresh`], and if stale triggers
271 /// [`WorkerRouter::refresh_workers`].
272 async fn start_worker_refresh_task(self: Arc<Self>) {
273 let worker_router = self.worker_router.clone();
274 let worker_manager = self.worker_manager.clone();
275 let closed = self.closed.clone();
276
277 let handle = tokio::spawn(async move {
278 loop {
279 tokio::time::sleep(REFRESH_CHECK_INTERVAL).await;
280
281 // Stop if the context has been closed.
282 if closed.load(Ordering::SeqCst) {
283 debug!("worker refresh task: context closed, exiting");
284 break;
285 }
286
287 // Refresh worker list if stale.
288 if worker_router.needs_refresh().await {
289 if let Err(e) = worker_router.refresh_workers(&worker_manager).await {
290 warn!("worker refresh failed: {}", e);
291 // refresh_workers already resets the TTL clock to avoid
292 // hammering on repeated failures (stale-while-revalidate).
293 } else {
294 debug!("worker list refreshed by background task");
295 }
296 }
297 }
298 });
299
300 *self.worker_refresh_task.lock().await = Some(handle);
301 }
302
303 /// Start the background config refresh loop.
304 ///
305 /// On first invocation the task **immediately** loads the config from
306 /// `goosefs-site.properties` (via `refresh_transparent_acceleration_switch`)
307 /// so that the transparent acceleration switches are up-to-date right after
308 /// `connect()` returns. Subsequent refreshes happen every
309 /// [`CONFIG_REFRESH_INTERVAL`] seconds (default 60s, matching Java's
310 /// `refreshInterval`).
311 ///
312 /// This runs independently from the worker-list refresh task.
313 async fn start_config_refresh_task(self: Arc<Self>) {
314 let config_refresher = self.config_refresher.clone();
315 let closed = self.closed.clone();
316
317 let handle = tokio::spawn(async move {
318 // Eagerly load config on startup so the switches are current
319 // before any file-system operation is issued.
320 let switch = config_refresher.refresh_transparent_acceleration_switch();
321 debug!(
322 transparent_acceleration_enabled = switch.enabled,
323 cosranger_enabled = switch.cosranger_enabled,
324 "config refresh: initial load completed"
325 );
326
327 loop {
328 tokio::time::sleep(CONFIG_REFRESH_INTERVAL).await;
329
330 // Stop if the context has been closed.
331 if closed.load(Ordering::SeqCst) {
332 debug!("config refresh task: context closed, exiting");
333 break;
334 }
335
336 // Refresh transparent acceleration switch (reload config if expired).
337 // Mirrors Java's NamespaceRefreshThread calling
338 // refreshTransparentAccelerationSwitch() each loop iteration.
339 let switch = config_refresher.refresh_transparent_acceleration_switch();
340 debug!(
341 transparent_acceleration_enabled = switch.enabled,
342 cosranger_enabled = switch.cosranger_enabled,
343 "config refresh check completed"
344 );
345 }
346 });
347
348 *self.config_refresh_task.lock().await = Some(handle);
349 }
350}
351
352impl Drop for FileSystemContext {
353 fn drop(&mut self) {
354 // Signal `closed` first so any in-flight background tasks that poll
355 // this flag stop themselves before we try to abort their handles.
356 // This avoids a race where a task wakes up between our abort() call
357 // and the actual cancellation and touches shared state.
358 self.closed.store(true, Ordering::SeqCst);
359
360 // Best-effort abort of the refresh tasks.
361 // `drop` is synchronous, so we use `try_lock`; if we cannot obtain the
362 // lock the task loop will observe `closed == true` on its next iteration
363 // and exit on its own.
364 if let Ok(mut guard) = self.worker_refresh_task.try_lock() {
365 if let Some(h) = guard.take() {
366 h.abort();
367 }
368 }
369 if let Ok(mut guard) = self.config_refresh_task.try_lock() {
370 if let Some(h) = guard.take() {
371 h.abort();
372 }
373 }
374 }
375}
376
377// ---------------------------------------------------------------------------
378// Tests
379// ---------------------------------------------------------------------------
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384 use std::time::Duration;
385
386 /// Verify that the context fields initialise with sane values when connected
387 /// (we can't test actual network here, but we can validate the structure).
388 #[test]
389 fn test_context_closed_starts_false() {
390 let closed = Arc::new(AtomicBool::new(false));
391 assert!(!closed.load(Ordering::SeqCst));
392 }
393
394 #[test]
395 fn test_context_close_is_idempotent() {
396 let closed = Arc::new(AtomicBool::new(false));
397
398 // First close
399 let was_open = !closed.swap(true, Ordering::SeqCst);
400 assert!(was_open);
401
402 // Second close — should be a no-op
403 let was_open2 = !closed.swap(true, Ordering::SeqCst);
404 assert!(!was_open2);
405 }
406
407 /// Verify that the worker refresh check interval constant is 30s.
408 #[test]
409 fn test_refresh_check_interval() {
410 assert_eq!(REFRESH_CHECK_INTERVAL, Duration::from_secs(30));
411 }
412
413 /// Verify that the config refresh interval constant is 60s (matching Java's refreshInterval).
414 #[test]
415 fn test_config_refresh_interval() {
416 assert_eq!(CONFIG_REFRESH_INTERVAL, Duration::from_secs(60));
417 }
418
419 /// Verify that WorkerRouter with_ttls accepts the values used by context.
420 #[test]
421 fn test_worker_router_ttls_accepted() {
422 let router = WorkerRouter::with_ttls(Duration::from_secs(60), Duration::from_secs(30));
423 // Just verifying it constructs without panic — fields are private.
424 drop(router);
425 }
426}