reflex/lifecycle/
manager.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::{Duration, Instant};
4
5use tokio::sync::RwLock;
6use tokio::time;
7
8use super::cloud::{CloudOps, GcpCloudOps, LocalCloudOps};
9use super::config::{
10    CloudProviderType, DEFAULT_SNAPSHOT_FILENAME, LifecycleConfig, REAPER_CHECK_INTERVAL_SECS,
11};
12use super::error::{LifecycleError, LifecycleResult};
13use super::types::{DehydrationResult, HydrationResult};
14
15/// Manages hydrate/dehydrate and an idle "reaper" that can stop the instance.
16pub struct LifecycleManager {
17    config: LifecycleConfig,
18    last_activity: Arc<RwLock<Instant>>,
19    shutdown_initiated: Arc<AtomicBool>,
20    reaper_running: Arc<AtomicBool>,
21    ops: Arc<dyn CloudOps>,
22}
23
24impl LifecycleManager {
25    /// Creates a manager using the configured cloud provider implementation.
26    pub fn new(config: LifecycleConfig) -> Self {
27        let ops: Arc<dyn CloudOps> = match config.cloud_provider {
28            CloudProviderType::Gcp => Arc::new(GcpCloudOps::new()),
29            CloudProviderType::Local => Arc::new(LocalCloudOps::new()),
30        };
31        Self {
32            config,
33            last_activity: Arc::new(RwLock::new(Instant::now())),
34            shutdown_initiated: Arc::new(AtomicBool::new(false)),
35            reaper_running: Arc::new(AtomicBool::new(false)),
36            ops,
37        }
38    }
39
40    /// Creates a manager with an explicit [`CloudOps`] implementation.
41    pub fn new_with_ops(config: LifecycleConfig, ops: Arc<dyn CloudOps>) -> Self {
42        Self {
43            config,
44            last_activity: Arc::new(RwLock::new(Instant::now())),
45            shutdown_initiated: Arc::new(AtomicBool::new(false)),
46            reaper_running: Arc::new(AtomicBool::new(false)),
47            ops,
48        }
49    }
50
51    /// Creates a manager from environment configuration.
52    pub fn from_env() -> LifecycleResult<Self> {
53        Ok(Self::new(LifecycleConfig::from_env()?))
54    }
55
56    /// Returns the active config.
57    pub fn config(&self) -> &LifecycleConfig {
58        &self.config
59    }
60
61    /// Records activity (resets the idle timer).
62    pub async fn record_activity(&self) {
63        *self.last_activity.write().await = Instant::now();
64    }
65
66    /// Returns the time since last activity.
67    pub async fn idle_duration(&self) -> Duration {
68        self.last_activity.read().await.elapsed()
69    }
70
71    /// Returns `true` if `idle_timeout` is exceeded.
72    pub async fn is_idle_timeout_exceeded(&self) -> bool {
73        self.idle_duration().await >= self.config.idle_timeout
74    }
75
76    /// Returns `true` if shutdown has been initiated.
77    pub fn is_shutdown_initiated(&self) -> bool {
78        // Acquire: ensures we see all writes that happened before the Release store
79        // that set this flag to true (e.g., the dehydration in shutdown())
80        self.shutdown_initiated.load(Ordering::Acquire)
81    }
82
83    /// Downloads a snapshot from cloud storage (if configured).
84    pub async fn hydrate(&self) -> LifecycleResult<HydrationResult> {
85        if !self.config.has_gcs_bucket() {
86            return Ok(HydrationResult::Skipped {
87                reason: "GCS bucket not configured".to_string(),
88            });
89        }
90
91        let bucket = &self.config.gcs_bucket;
92        let object = DEFAULT_SNAPSHOT_FILENAME;
93        let local_path = &self.config.local_snapshot_path;
94
95        if let Some(parent) = local_path.parent() {
96            tokio::fs::create_dir_all(parent).await?;
97        }
98
99        match self.ops.download_file(bucket, object, local_path).await {
100            Ok(_) => {
101                let meta = tokio::fs::metadata(local_path).await?;
102                Ok(HydrationResult::Success { bytes: meta.len() })
103            }
104            Err(LifecycleError::CloudError(msg))
105                if msg.contains("No URLs matched") || msg.contains("not found") =>
106            {
107                Ok(HydrationResult::NotFound)
108            }
109            Err(e) => Err(e),
110        }
111    }
112
113    /// Uploads the local snapshot to cloud storage (if configured and present).
114    pub async fn dehydrate(&self) -> LifecycleResult<DehydrationResult> {
115        if !self.config.has_gcs_bucket() {
116            return Ok(DehydrationResult::Skipped {
117                reason: "GCS bucket not configured".to_string(),
118            });
119        }
120
121        let bucket = &self.config.gcs_bucket;
122        let object = DEFAULT_SNAPSHOT_FILENAME;
123        let local_path = &self.config.local_snapshot_path;
124
125        if !local_path.exists() {
126            return Ok(DehydrationResult::NoSnapshot);
127        }
128
129        self.ops.upload_file(bucket, object, local_path).await?;
130        let bytes = tokio::fs::metadata(local_path).await?.len();
131        Ok(DehydrationResult::Success { bytes })
132    }
133
134    /// Starts the idle reaper background task (no-op if already running).
135    pub fn start_reaper_thread(&self) -> tokio::task::JoinHandle<()> {
136        // AcqRel: swap needs both load and store semantics to ensure only one
137        // reaper thread starts. Acquire sees prior stores, Release publishes our store.
138        if self.reaper_running.swap(true, Ordering::AcqRel) {
139            return tokio::spawn(async {});
140        }
141
142        let config = self.config.clone();
143        let last_activity = Arc::clone(&self.last_activity);
144        let shutdown_initiated = Arc::clone(&self.shutdown_initiated);
145        let reaper_running = Arc::clone(&self.reaper_running);
146        let ops = Arc::clone(&self.ops);
147
148        tokio::spawn(async move {
149            let mut interval = time::interval(Duration::from_secs(REAPER_CHECK_INTERVAL_SECS));
150            loop {
151                interval.tick().await;
152                // Acquire: synchronizes with the Release store from shutdown() to ensure
153                // we see any side effects that occurred before shutdown was initiated
154                if shutdown_initiated.load(Ordering::Acquire) {
155                    break;
156                }
157
158                let idle = last_activity.read().await.elapsed();
159                if idle >= config.idle_timeout {
160                    // Release: ensures the dehydration and stop operations that follow
161                    // are visible to any thread that later loads this flag with Acquire
162                    shutdown_initiated.store(true, Ordering::Release);
163
164                    if config.has_gcs_bucket() {
165                        let bucket = &config.gcs_bucket;
166                        let object = DEFAULT_SNAPSHOT_FILENAME;
167                        let path = &config.local_snapshot_path;
168                        if path.exists() {
169                            let _ = ops.upload_file(bucket, object, path).await;
170                        }
171                    }
172
173                    if config.enable_instance_stop {
174                        let _ = ops.stop_self().await;
175                    }
176                    break;
177                }
178            }
179            // Release: ensures all reaper work is visible before marking it as not running,
180            // so a subsequent start_reaper_thread() with Acquire sees the completed state
181            reaper_running.store(false, Ordering::Release);
182        })
183    }
184
185    /// Initiates shutdown (idempotent) and runs dehydration once.
186    pub async fn shutdown(&self) -> LifecycleResult<()> {
187        // AcqRel: swap needs both semantics - Acquire to see if already shut down,
188        // Release to publish shutdown state so other threads (reaper, callers) see it
189        if self.shutdown_initiated.swap(true, Ordering::AcqRel) {
190            return Ok(());
191        }
192        self.dehydrate().await.map(|_| ())
193    }
194}
195
196#[derive(Clone)]
197/// Convenience wrapper to record activity without exposing the full manager.
198pub struct ActivityRecorder {
199    manager: Arc<LifecycleManager>,
200}
201
202impl ActivityRecorder {
203    /// Creates a new recorder for a manager.
204    pub fn new(manager: Arc<LifecycleManager>) -> Self {
205        Self { manager }
206    }
207
208    /// Records activity.
209    pub async fn record(&self) {
210        self.manager.record_activity().await;
211    }
212}