Skip to main content

goosefs_sdk/
context.rs

1//! `FileSystemContext` — shared connection pool and routing context.
2//!
3//! This module implements the **three-layer connection architecture** that
4//! eliminates repeated TCP+SASL handshakes:
5//!
6//! ```text
7//! Layer 3: FileSystemContext — lifecycle manager + unified acquisition API
8//!          │
9//!          ├── Arc<MasterClient>         — persistent Master gRPC channel
10//!          ├── Arc<WorkerManagerClient>  — persistent WorkerMgr gRPC channel
11//!          ├── Arc<WorkerClientPool>     — shared Worker connection pool
12//!          └── Arc<WorkerRouter>         — shared consistent-hash router
13//!
14//! Layer 2: WorkerClientPool / WorkerRouter — connection & routing management
15//!
16//! Layer 1: MasterClient / WorkerManagerClient / WorkerClient — gRPC stubs
17//! ```
18//!
19//! # Before vs After
20//!
21//! | Operation | Before (per-call) | After (shared) |
22//! |-----------|-------------------|----------------|
23//! | `BaseFileSystem::get_status()` | 1 TCP+SASL | 0 (reused) |
24//! | `GoosefsFileInStream::open()` | 2 TCP+SASL | 0 (reused) |
25//! | Reading N blocks | N TCP connects | ~N_workers (pooled) |
26//!
27//! # Usage
28//!
29//! ```rust,no_run
30//! use goosefs_sdk::context::FileSystemContext;
31//! use goosefs_sdk::config::GoosefsConfig;
32//! use goosefs_sdk::fs::FileSystem; // needed to call trait methods
33//!
34//! # async fn example() -> goosefs_sdk::error::Result<()> {
35//! // Build once, share across all operations
36//! let ctx = FileSystemContext::connect(GoosefsConfig::new("127.0.0.1:9200")).await?;
37//!
38//! // Pass ctx into filesystem operations
39//! use goosefs_sdk::fs::BaseFileSystem;
40//! let fs = BaseFileSystem::from_context(ctx.clone());
41//! let status = fs.get_status("/data/file.parquet").await?;
42//! # Ok(())
43//! # }
44//! ```
45
46use std::sync::atomic::{AtomicBool, Ordering};
47use std::sync::Arc;
48use std::time::Duration;
49
50use tokio::sync::Mutex;
51use tracing::{debug, warn};
52
53use crate::block::router::WorkerRouter;
54use crate::client::{
55    create_master_inquire_client, MasterClient, MasterInquireClient, WorkerClientPool,
56    WorkerManagerClient,
57};
58use crate::config::{ConfigRefresher, GoosefsConfig, TransparentAccelerationSwitch};
59use crate::error::{Error, Result};
60
61/// How often the background refresh loop checks whether the worker list is stale.
62/// Matches the `DEFAULT_WORKER_REFRESH_TTL` (30s) in `WorkerRouter`.
63const REFRESH_CHECK_INTERVAL: Duration = Duration::from_secs(30);
64
65/// How often the background config refresh loop runs (default 60s).
66///
67/// Mirrors Java's `refreshInterval` (default 60s) in `NamespaceRefreshThread`.
68/// This is intentionally separate from [`REFRESH_CHECK_INTERVAL`] so that
69/// config reloading and worker-list refreshing run on independent cadences.
70const CONFIG_REFRESH_INTERVAL: Duration = Duration::from_secs(60);
71
72/// Shared connection context for Goosefs filesystem operations.
73///
74/// A single `FileSystemContext` instance should be created per Goosefs cluster
75/// and shared across all `BaseFileSystem`, `GoosefsFileInStream`, and
76/// `GoosefsFileWriter` instances that connect to that cluster.
77///
78/// The context owns:
79/// - One persistent gRPC channel to the Master
80/// - One persistent gRPC channel to the WorkerManager service
81/// - One `WorkerClientPool` shared across all readers and writers
82/// - One `WorkerRouter` that tracks live workers and routes block reads
83pub struct FileSystemContext {
84    config: Arc<GoosefsConfig>,
85
86    /// Persistent Master gRPC connection (metadata RPCs).
87    master: Arc<MasterClient>,
88
89    /// Persistent WorkerManager gRPC connection (`GetWorkerInfoList`).
90    worker_manager: Arc<WorkerManagerClient>,
91
92    /// Worker gRPC connection pool — shared across all readers and writers.
93    worker_pool: Arc<WorkerClientPool>,
94
95    /// Consistent-hash router with TTL refresh and local-first preference.
96    worker_router: Arc<WorkerRouter>,
97
98    /// HA Master address discovery client (shared between master + wm).
99    inquire_client: Arc<dyn MasterInquireClient>,
100
101    /// Periodic config refresher — reloads `goosefs-site.properties` when
102    /// expired and updates the transparent acceleration switch flags.
103    ///
104    /// Mirrors Java's `ConfigurationUtils.loadIfExpire()` +
105    /// `AbstractCompatibleFileSystem.refreshTransparentAccelerationSwitch()`.
106    config_refresher: Arc<ConfigRefresher>,
107
108    /// Set to `true` once `close()` has been called.
109    closed: Arc<AtomicBool>,
110
111    /// Handle to the background worker-list TTL-refresh task.
112    /// Aborted on `close()`.
113    worker_refresh_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
114
115    /// Handle to the background config refresh task.
116    /// Aborted on `close()`.
117    config_refresh_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
118}
119
120impl FileSystemContext {
121    // ── Construction ────────────────────────────────────────────────────────
122
123    /// Build a `FileSystemContext` by connecting to the Goosefs cluster.
124    ///
125    /// Establishes persistent connections to the Master and WorkerManager,
126    /// fetches the initial worker list, and starts a background refresh task.
127    ///
128    /// This is the **only** call that performs network I/O.  All subsequent
129    /// operations on the context are zero-cost Arc clones.
130    pub async fn connect(config: GoosefsConfig) -> Result<Arc<Self>> {
131        let config = Arc::new(config);
132
133        // Build a shared inquire client so Master + WorkerManager both use the
134        // same singleflight-deduped HA discovery.
135        let inquire_client = create_master_inquire_client(&config);
136
137        // Connect Master and WorkerManager in parallel.
138        let (master_res, wm_res) = tokio::join!(
139            MasterClient::connect_with_inquire(&config, inquire_client.clone()),
140            WorkerManagerClient::connect_with_inquire(&config, inquire_client.clone()),
141        );
142        let master = Arc::new(master_res?);
143        let worker_manager = Arc::new(wm_res?);
144
145        // Fetch the initial worker list.
146        let workers = worker_manager.get_worker_info_list().await?;
147        if workers.is_empty() {
148            return Err(Error::NoWorkerAvailable {
149                message: "no workers available at startup".to_string(),
150            });
151        }
152        debug!(count = workers.len(), "initial worker list fetched");
153
154        // Build the router with failure/refresh TTLs.
155        let worker_router = Arc::new(WorkerRouter::with_ttls(
156            Duration::from_secs(60), // failure_ttl
157            Duration::from_secs(30), // worker_refresh_ttl (matches Go SDK)
158        ));
159        worker_router.update_workers(workers).await;
160
161        // Build the shared worker connection pool.
162        let worker_pool = WorkerClientPool::new_shared((*config).clone());
163
164        let ctx = Arc::new(Self {
165            config: config.clone(),
166            master,
167            worker_manager,
168            worker_pool,
169            worker_router,
170            inquire_client,
171            config_refresher: Arc::new(ConfigRefresher::from_config(&config)),
172            closed: Arc::new(AtomicBool::new(false)),
173            worker_refresh_task: Mutex::new(None),
174            config_refresh_task: Mutex::new(None),
175        });
176
177        // Start the background worker-list refresh loop.
178        ctx.clone().start_worker_refresh_task().await;
179        // Start the background config refresh loop (separate cadence).
180        ctx.clone().start_config_refresh_task().await;
181
182        Ok(ctx)
183    }
184
185    // ── Acquisition API ──────────────────────────────────────────────────────
186
187    /// Return the shared `MasterClient` (zero-cost Arc clone).
188    pub fn acquire_master(&self) -> Arc<MasterClient> {
189        self.master.clone()
190    }
191
192    /// Return the shared `WorkerManagerClient` (zero-cost Arc clone).
193    pub fn acquire_worker_manager(&self) -> Arc<WorkerManagerClient> {
194        self.worker_manager.clone()
195    }
196
197    /// Return the shared `WorkerClientPool` (zero-cost Arc clone).
198    pub fn acquire_worker_pool(&self) -> Arc<WorkerClientPool> {
199        self.worker_pool.clone()
200    }
201
202    /// Return the shared `WorkerRouter` (zero-cost Arc clone).
203    pub fn acquire_router(&self) -> Arc<WorkerRouter> {
204        self.worker_router.clone()
205    }
206
207    /// Return the shared `MasterInquireClient` (zero-cost Arc clone).
208    pub fn acquire_inquire_client(&self) -> Arc<dyn MasterInquireClient> {
209        self.inquire_client.clone()
210    }
211
212    /// Return the configuration used to build this context.
213    pub fn config(&self) -> &GoosefsConfig {
214        &self.config
215    }
216
217    /// Return the shared `ConfigRefresher` (zero-cost Arc clone).
218    ///
219    /// Use this to query the current transparent acceleration switch values
220    /// or to trigger a config reload check.
221    pub fn acquire_config_refresher(&self) -> Arc<ConfigRefresher> {
222        self.config_refresher.clone()
223    }
224
225    /// Refresh the transparent acceleration switch by reloading config if expired.
226    ///
227    /// Convenience wrapper around `ConfigRefresher::refresh_transparent_acceleration_switch()`.
228    /// Mirrors Java's `AbstractCompatibleFileSystem.refreshTransparentAccelerationSwitch()`.
229    pub fn refresh_transparent_acceleration_switch(&self) -> TransparentAccelerationSwitch {
230        self.config_refresher
231            .refresh_transparent_acceleration_switch()
232    }
233
234    // ── Lifecycle ────────────────────────────────────────────────────────────
235
236    /// Gracefully shut down the context.
237    ///
238    /// Aborts the background refresh task and marks the context as closed.
239    /// Idempotent — safe to call multiple times.
240    pub async fn close(&self) -> Result<()> {
241        if self.closed.swap(true, Ordering::SeqCst) {
242            return Ok(()); // Already closed.
243        }
244
245        // Cancel the background refresh tasks.
246        let worker_handle = self.worker_refresh_task.lock().await.take();
247        if let Some(h) = worker_handle {
248            h.abort();
249            debug!("worker refresh task aborted");
250        }
251        let config_handle = self.config_refresh_task.lock().await.take();
252        if let Some(h) = config_handle {
253            h.abort();
254            debug!("config refresh task aborted");
255        }
256
257        Ok(())
258    }
259
260    /// Return `true` if `close()` has been called.
261    pub fn is_closed(&self) -> bool {
262        self.closed.load(Ordering::SeqCst)
263    }
264
265    // ── Background refresh ────────────────────────────────────────────────────
266
267    /// Start the background worker-list TTL-refresh loop.
268    ///
269    /// The loop wakes every [`REFRESH_CHECK_INTERVAL`] seconds, calls
270    /// [`WorkerRouter::needs_refresh`], and if stale triggers
271    /// [`WorkerRouter::refresh_workers`].
272    async fn start_worker_refresh_task(self: Arc<Self>) {
273        let worker_router = self.worker_router.clone();
274        let worker_manager = self.worker_manager.clone();
275        let closed = self.closed.clone();
276
277        let handle = tokio::spawn(async move {
278            loop {
279                tokio::time::sleep(REFRESH_CHECK_INTERVAL).await;
280
281                // Stop if the context has been closed.
282                if closed.load(Ordering::SeqCst) {
283                    debug!("worker refresh task: context closed, exiting");
284                    break;
285                }
286
287                // Refresh worker list if stale.
288                if worker_router.needs_refresh().await {
289                    if let Err(e) = worker_router.refresh_workers(&worker_manager).await {
290                        warn!("worker refresh failed: {}", e);
291                        // refresh_workers already resets the TTL clock to avoid
292                        // hammering on repeated failures (stale-while-revalidate).
293                    } else {
294                        debug!("worker list refreshed by background task");
295                    }
296                }
297            }
298        });
299
300        *self.worker_refresh_task.lock().await = Some(handle);
301    }
302
303    /// Start the background config refresh loop.
304    ///
305    /// On first invocation the task **immediately** loads the config from
306    /// `goosefs-site.properties` (via `refresh_transparent_acceleration_switch`)
307    /// so that the transparent acceleration switches are up-to-date right after
308    /// `connect()` returns.  Subsequent refreshes happen every
309    /// [`CONFIG_REFRESH_INTERVAL`] seconds (default 60s, matching Java's
310    /// `refreshInterval`).
311    ///
312    /// This runs independently from the worker-list refresh task.
313    async fn start_config_refresh_task(self: Arc<Self>) {
314        let config_refresher = self.config_refresher.clone();
315        let closed = self.closed.clone();
316
317        let handle = tokio::spawn(async move {
318            // Eagerly load config on startup so the switches are current
319            // before any file-system operation is issued.
320            let switch = config_refresher.refresh_transparent_acceleration_switch();
321            debug!(
322                transparent_acceleration_enabled = switch.enabled,
323                cosranger_enabled = switch.cosranger_enabled,
324                "config refresh: initial load completed"
325            );
326
327            loop {
328                tokio::time::sleep(CONFIG_REFRESH_INTERVAL).await;
329
330                // Stop if the context has been closed.
331                if closed.load(Ordering::SeqCst) {
332                    debug!("config refresh task: context closed, exiting");
333                    break;
334                }
335
336                // Refresh transparent acceleration switch (reload config if expired).
337                // Mirrors Java's NamespaceRefreshThread calling
338                // refreshTransparentAccelerationSwitch() each loop iteration.
339                let switch = config_refresher.refresh_transparent_acceleration_switch();
340                debug!(
341                    transparent_acceleration_enabled = switch.enabled,
342                    cosranger_enabled = switch.cosranger_enabled,
343                    "config refresh check completed"
344                );
345            }
346        });
347
348        *self.config_refresh_task.lock().await = Some(handle);
349    }
350}
351
352impl Drop for FileSystemContext {
353    fn drop(&mut self) {
354        // Signal `closed` first so any in-flight background tasks that poll
355        // this flag stop themselves before we try to abort their handles.
356        // This avoids a race where a task wakes up between our abort() call
357        // and the actual cancellation and touches shared state.
358        self.closed.store(true, Ordering::SeqCst);
359
360        // Best-effort abort of the refresh tasks.
361        // `drop` is synchronous, so we use `try_lock`; if we cannot obtain the
362        // lock the task loop will observe `closed == true` on its next iteration
363        // and exit on its own.
364        if let Ok(mut guard) = self.worker_refresh_task.try_lock() {
365            if let Some(h) = guard.take() {
366                h.abort();
367            }
368        }
369        if let Ok(mut guard) = self.config_refresh_task.try_lock() {
370            if let Some(h) = guard.take() {
371                h.abort();
372            }
373        }
374    }
375}
376
377// ---------------------------------------------------------------------------
378// Tests
379// ---------------------------------------------------------------------------
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384    use std::time::Duration;
385
386    /// Verify that the context fields initialise with sane values when connected
387    /// (we can't test actual network here, but we can validate the structure).
388    #[test]
389    fn test_context_closed_starts_false() {
390        let closed = Arc::new(AtomicBool::new(false));
391        assert!(!closed.load(Ordering::SeqCst));
392    }
393
394    #[test]
395    fn test_context_close_is_idempotent() {
396        let closed = Arc::new(AtomicBool::new(false));
397
398        // First close
399        let was_open = !closed.swap(true, Ordering::SeqCst);
400        assert!(was_open);
401
402        // Second close — should be a no-op
403        let was_open2 = !closed.swap(true, Ordering::SeqCst);
404        assert!(!was_open2);
405    }
406
407    /// Verify that the worker refresh check interval constant is 30s.
408    #[test]
409    fn test_refresh_check_interval() {
410        assert_eq!(REFRESH_CHECK_INTERVAL, Duration::from_secs(30));
411    }
412
413    /// Verify that the config refresh interval constant is 60s (matching Java's refreshInterval).
414    #[test]
415    fn test_config_refresh_interval() {
416        assert_eq!(CONFIG_REFRESH_INTERVAL, Duration::from_secs(60));
417    }
418
419    /// Verify that WorkerRouter with_ttls accepts the values used by context.
420    #[test]
421    fn test_worker_router_ttls_accepted() {
422        let router = WorkerRouter::with_ttls(Duration::from_secs(60), Duration::from_secs(30));
423        // Just verifying it constructs without panic — fields are private.
424        drop(router);
425    }
426}