reflex/lifecycle/
manager.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::{Duration, Instant};
4
5use tokio::sync::RwLock;
6use tokio::time;
7
8use super::cloud::{CloudOps, GcpCloudOps, LocalCloudOps};
9use super::config::{
10 CloudProviderType, DEFAULT_SNAPSHOT_FILENAME, LifecycleConfig, REAPER_CHECK_INTERVAL_SECS,
11};
12use super::error::{LifecycleError, LifecycleResult};
13use super::types::{DehydrationResult, HydrationResult};
14
15pub struct LifecycleManager {
17 config: LifecycleConfig,
18 last_activity: Arc<RwLock<Instant>>,
19 shutdown_initiated: Arc<AtomicBool>,
20 reaper_running: Arc<AtomicBool>,
21 ops: Arc<dyn CloudOps>,
22}
23
24impl LifecycleManager {
25 pub fn new(config: LifecycleConfig) -> Self {
27 let ops: Arc<dyn CloudOps> = match config.cloud_provider {
28 CloudProviderType::Gcp => Arc::new(GcpCloudOps::new()),
29 CloudProviderType::Local => Arc::new(LocalCloudOps::new()),
30 };
31 Self {
32 config,
33 last_activity: Arc::new(RwLock::new(Instant::now())),
34 shutdown_initiated: Arc::new(AtomicBool::new(false)),
35 reaper_running: Arc::new(AtomicBool::new(false)),
36 ops,
37 }
38 }
39
40 pub fn new_with_ops(config: LifecycleConfig, ops: Arc<dyn CloudOps>) -> Self {
42 Self {
43 config,
44 last_activity: Arc::new(RwLock::new(Instant::now())),
45 shutdown_initiated: Arc::new(AtomicBool::new(false)),
46 reaper_running: Arc::new(AtomicBool::new(false)),
47 ops,
48 }
49 }
50
51 pub fn from_env() -> LifecycleResult<Self> {
53 Ok(Self::new(LifecycleConfig::from_env()?))
54 }
55
56 pub fn config(&self) -> &LifecycleConfig {
58 &self.config
59 }
60
61 pub async fn record_activity(&self) {
63 *self.last_activity.write().await = Instant::now();
64 }
65
66 pub async fn idle_duration(&self) -> Duration {
68 self.last_activity.read().await.elapsed()
69 }
70
71 pub async fn is_idle_timeout_exceeded(&self) -> bool {
73 self.idle_duration().await >= self.config.idle_timeout
74 }
75
76 pub fn is_shutdown_initiated(&self) -> bool {
78 self.shutdown_initiated.load(Ordering::Acquire)
81 }
82
83 pub async fn hydrate(&self) -> LifecycleResult<HydrationResult> {
85 if !self.config.has_gcs_bucket() {
86 return Ok(HydrationResult::Skipped {
87 reason: "GCS bucket not configured".to_string(),
88 });
89 }
90
91 let bucket = &self.config.gcs_bucket;
92 let object = DEFAULT_SNAPSHOT_FILENAME;
93 let local_path = &self.config.local_snapshot_path;
94
95 if let Some(parent) = local_path.parent() {
96 tokio::fs::create_dir_all(parent).await?;
97 }
98
99 match self.ops.download_file(bucket, object, local_path).await {
100 Ok(_) => {
101 let meta = tokio::fs::metadata(local_path).await?;
102 Ok(HydrationResult::Success { bytes: meta.len() })
103 }
104 Err(LifecycleError::CloudError(msg))
105 if msg.contains("No URLs matched") || msg.contains("not found") =>
106 {
107 Ok(HydrationResult::NotFound)
108 }
109 Err(e) => Err(e),
110 }
111 }
112
113 pub async fn dehydrate(&self) -> LifecycleResult<DehydrationResult> {
115 if !self.config.has_gcs_bucket() {
116 return Ok(DehydrationResult::Skipped {
117 reason: "GCS bucket not configured".to_string(),
118 });
119 }
120
121 let bucket = &self.config.gcs_bucket;
122 let object = DEFAULT_SNAPSHOT_FILENAME;
123 let local_path = &self.config.local_snapshot_path;
124
125 if !local_path.exists() {
126 return Ok(DehydrationResult::NoSnapshot);
127 }
128
129 self.ops.upload_file(bucket, object, local_path).await?;
130 let bytes = tokio::fs::metadata(local_path).await?.len();
131 Ok(DehydrationResult::Success { bytes })
132 }
133
134 pub fn start_reaper_thread(&self) -> tokio::task::JoinHandle<()> {
136 if self.reaper_running.swap(true, Ordering::AcqRel) {
139 return tokio::spawn(async {});
140 }
141
142 let config = self.config.clone();
143 let last_activity = Arc::clone(&self.last_activity);
144 let shutdown_initiated = Arc::clone(&self.shutdown_initiated);
145 let reaper_running = Arc::clone(&self.reaper_running);
146 let ops = Arc::clone(&self.ops);
147
148 tokio::spawn(async move {
149 let mut interval = time::interval(Duration::from_secs(REAPER_CHECK_INTERVAL_SECS));
150 loop {
151 interval.tick().await;
152 if shutdown_initiated.load(Ordering::Acquire) {
155 break;
156 }
157
158 let idle = last_activity.read().await.elapsed();
159 if idle >= config.idle_timeout {
160 shutdown_initiated.store(true, Ordering::Release);
163
164 if config.has_gcs_bucket() {
165 let bucket = &config.gcs_bucket;
166 let object = DEFAULT_SNAPSHOT_FILENAME;
167 let path = &config.local_snapshot_path;
168 if path.exists() {
169 let _ = ops.upload_file(bucket, object, path).await;
170 }
171 }
172
173 if config.enable_instance_stop {
174 let _ = ops.stop_self().await;
175 }
176 break;
177 }
178 }
179 reaper_running.store(false, Ordering::Release);
182 })
183 }
184
185 pub async fn shutdown(&self) -> LifecycleResult<()> {
187 if self.shutdown_initiated.swap(true, Ordering::AcqRel) {
190 return Ok(());
191 }
192 self.dehydrate().await.map(|_| ())
193 }
194}
195
196#[derive(Clone)]
197pub struct ActivityRecorder {
199 manager: Arc<LifecycleManager>,
200}
201
202impl ActivityRecorder {
203 pub fn new(manager: Arc<LifecycleManager>) -> Self {
205 Self { manager }
206 }
207
208 pub async fn record(&self) {
210 self.manager.record_activity().await;
211 }
212}