1mod signals;
4
5use std::future::{Future, IntoFuture};
6use std::ops::Deref;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::watch;
13use tokio::task::JoinHandle;
14
15use crate::transport::{DropGuard, TransportClient};
16use crate::{EnsureSession, Result, RmuxError, Session, SessionName};
17use rmux_proto::{
18 CreateSessionLeaseRequest, KillSessionRequest, ReleaseSessionLeaseRequest,
19 RenewSessionLeaseRequest, Request, Response, CAPABILITY_SDK_SESSION_LEASE,
20};
21
22use super::Rmux;
23pub use signals::OwnedSessionSignalHandlers;
24
25const DEFAULT_LEASE_TTL: Duration = Duration::from_secs(5);
26const MIN_LEASE_RENEW_INTERVAL: Duration = Duration::from_millis(100);
27const MAX_LEASE_RENEW_RETRY_INTERVAL: Duration = Duration::from_millis(250);
28
29#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
31#[non_exhaustive]
32pub enum CleanupPolicy {
33 #[default]
35 KillOnDrop,
36 KillOnOwnerExit,
38 Preserve,
40}
41
42#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
44#[non_exhaustive]
45pub enum LeaseState {
46 #[default]
49 NotLeased,
50 Active,
52 Lost,
54}
55
56#[derive(Debug)]
58pub struct OwnedSessionBuilder<'a> {
59 rmux: &'a Rmux,
60 name: SessionName,
61 replace_existing: bool,
62 cleanup_policy: CleanupPolicy,
63 lease_ttl: Duration,
64}
65
66impl<'a> OwnedSessionBuilder<'a> {
67 pub(crate) const fn new(rmux: &'a Rmux, name: SessionName) -> Self {
68 Self {
69 rmux,
70 name,
71 replace_existing: false,
72 cleanup_policy: CleanupPolicy::KillOnDrop,
73 lease_ttl: DEFAULT_LEASE_TTL,
74 }
75 }
76
77 #[must_use]
80 pub const fn replace_existing(mut self, replace_existing: bool) -> Self {
81 self.replace_existing = replace_existing;
82 self
83 }
84
85 #[must_use]
87 pub const fn cleanup_policy(mut self, cleanup_policy: CleanupPolicy) -> Self {
88 self.cleanup_policy = cleanup_policy;
89 self
90 }
91
92 #[must_use]
95 pub const fn lease_ttl(mut self, ttl: Duration) -> Self {
96 self.lease_ttl = ttl;
97 self
98 }
99
100 async fn run(self) -> Result<OwnedSession> {
101 if self.cleanup_policy == CleanupPolicy::KillOnOwnerExit {
102 validate_lease_ttl(self.lease_ttl)?;
103 }
104
105 if self.replace_existing {
106 match self.rmux.session(self.name.clone()).await {
107 Ok(session) => {
108 let _ = session.kill().await?;
109 }
110 Err(error) if is_missing_session(&error) => {}
111 Err(error) => return Err(error),
112 }
113 }
114
115 let session = self
116 .rmux
117 .ensure_session(EnsureSession::named(self.name).create_only().detached(true))
118 .await?;
119 let lease = if self.cleanup_policy == CleanupPolicy::KillOnOwnerExit {
120 Some(OwnedSessionLease::start(&session, self.lease_ttl).await?)
121 } else {
122 None
123 };
124 Ok(OwnedSession {
125 session: Some(session),
126 cleanup_policy: self.cleanup_policy,
127 lease,
128 signal_handlers_installed: Arc::new(AtomicBool::new(false)),
129 })
130 }
131}
132
133impl<'a> IntoFuture for OwnedSessionBuilder<'a> {
134 type Output = Result<OwnedSession>;
135 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
136
137 fn into_future(self) -> Self::IntoFuture {
138 Box::pin(self.run())
139 }
140}
141
142#[derive(Debug)]
144pub struct OwnedSession {
145 session: Option<Session>,
146 cleanup_policy: CleanupPolicy,
147 lease: Option<OwnedSessionLease>,
148 signal_handlers_installed: Arc<AtomicBool>,
149}
150
151impl OwnedSession {
152 #[must_use]
154 pub const fn cleanup_policy(&self) -> CleanupPolicy {
155 self.cleanup_policy
156 }
157
158 #[must_use]
163 pub const fn is_active(&self) -> bool {
164 self.session.is_some()
165 }
166
167 #[must_use]
173 pub fn lease_lost(&self) -> bool {
174 self.lease.as_ref().is_some_and(OwnedSessionLease::is_lost)
175 }
176
177 #[must_use]
179 pub fn lease_state(&self) -> LeaseState {
180 self.lease
181 .as_ref()
182 .map_or(LeaseState::NotLeased, OwnedSessionLease::state)
183 }
184
185 #[must_use]
190 pub fn lease_state_receiver(&self) -> Option<watch::Receiver<LeaseState>> {
191 self.lease.as_ref().map(OwnedSessionLease::subscribe)
192 }
193
194 pub async fn cleanup(&mut self) -> Result<bool> {
197 let Some(session) = self.session.as_ref() else {
198 return Ok(false);
199 };
200 match self.cleanup_policy {
201 CleanupPolicy::KillOnDrop | CleanupPolicy::KillOnOwnerExit => {
202 let killed = session.kill().await?;
203 self.session.take();
204 if let Some(lease) = self.lease.as_ref() {
205 lease.mark_not_leased();
206 }
207 self.lease.take();
208 Ok(killed)
209 }
210 CleanupPolicy::Preserve => Ok(false),
211 }
212 }
213
214 pub async fn shutdown_now(&mut self) -> Result<bool> {
219 self.cleanup().await
220 }
221
222 pub fn install_default_signal_handlers(&self) -> Result<OwnedSessionSignalHandlers> {
230 let Some(session) = self.session.as_ref() else {
231 return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(
232 "owned session no longer active".to_owned(),
233 )));
234 };
235 if self
236 .signal_handlers_installed
237 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
238 .is_err()
239 {
240 return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(
241 "owned session signal handlers are already installed".to_owned(),
242 )));
243 }
244
245 let transport = session.transport().clone();
246 let target = session.name().clone();
247 let installed = Arc::clone(&self.signal_handlers_installed);
248 signals::install_default_signal_handlers(transport, target, installed)
249 }
250
251 pub async fn preserve(mut self) -> Result<Self> {
253 self.release_lease_confirmed().await?;
254 self.cleanup_policy = CleanupPolicy::Preserve;
255 Ok(self)
256 }
257
258 pub async fn detach_owned(mut self) -> Result<Session> {
260 self.release_lease_confirmed().await?;
261 self.cleanup_policy = CleanupPolicy::Preserve;
262 Ok(self
263 .session
264 .take()
265 .expect("owned session must contain a session until detached"))
266 }
267
268 #[must_use]
270 pub fn try_session(&self) -> Option<&Session> {
271 self.session.as_ref()
272 }
273
274 #[must_use]
280 pub fn session(&self) -> &Session {
281 self.session
282 .as_ref()
283 .expect("owned session no longer contains a session")
284 }
285
286 async fn release_lease_confirmed(&mut self) -> Result<()> {
287 if let Some(lease) = self.lease.as_ref() {
288 lease.release_confirmed().await?;
289 }
290 self.lease.take();
291 Ok(())
292 }
293}
294
295#[derive(Debug)]
296struct OwnedSessionLease {
297 session_name: SessionName,
298 token: u64,
299 transport: TransportClient,
300 task: JoinHandle<()>,
301 lost: Arc<AtomicBool>,
302 state_tx: watch::Sender<LeaseState>,
303}
304
305impl OwnedSessionLease {
306 async fn start(session: &Session, ttl: Duration) -> Result<Self> {
307 let ttl_millis = ttl_millis(ttl)?;
308 let transport = session.transport().clone();
309 crate::capabilities::require(&transport, &[CAPABILITY_SDK_SESSION_LEASE]).await?;
310 let response = transport
311 .request(Request::CreateSessionLease(CreateSessionLeaseRequest {
312 session_name: session.name().clone(),
313 ttl_millis,
314 }))
315 .await?;
316 let Response::CreateSessionLease(response) = response else {
317 return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(
318 "daemon returned unexpected response for session lease create".to_owned(),
319 )));
320 };
321
322 let session_name = session.name().clone();
323 let token = response.token;
324 let renew_transport = transport.clone();
325 let renew_session_name = session_name.clone();
326 let lost = Arc::new(AtomicBool::new(false));
327 let renew_lost = Arc::clone(&lost);
328 let (state_tx, _) = watch::channel(LeaseState::Active);
329 let renew_state_tx = state_tx.clone();
330 let renew_interval = (ttl / 3).max(MIN_LEASE_RENEW_INTERVAL);
331 let task = tokio::spawn(async move {
332 let mut last_renew_success = tokio::time::Instant::now();
333 loop {
334 tokio::time::sleep(renew_interval).await;
335 let deadline = last_renew_success + ttl;
336 if !renew_lease_with_retries(
337 &renew_transport,
338 &renew_session_name,
339 token,
340 ttl_millis,
341 deadline,
342 )
343 .await
344 {
345 renew_lost.store(true, Ordering::Release);
346 let _ = renew_state_tx.send(LeaseState::Lost);
347 break;
348 }
349 last_renew_success = tokio::time::Instant::now();
350 }
351 });
352
353 Ok(Self {
354 session_name,
355 token,
356 transport,
357 task,
358 lost,
359 state_tx,
360 })
361 }
362
363 fn is_lost(&self) -> bool {
364 self.lost.load(Ordering::Acquire)
365 }
366
367 fn state(&self) -> LeaseState {
368 if self.is_lost() {
369 LeaseState::Lost
370 } else {
371 *self.state_tx.borrow()
372 }
373 }
374
375 fn subscribe(&self) -> watch::Receiver<LeaseState> {
376 self.state_tx.subscribe()
377 }
378
379 fn mark_not_leased(&self) {
380 let _ = self.state_tx.send(LeaseState::NotLeased);
381 }
382
383 fn mark_lost(&self) {
384 self.lost.store(true, Ordering::Release);
385 let _ = self.state_tx.send(LeaseState::Lost);
386 }
387
388 async fn release_confirmed(&self) -> Result<bool> {
389 if self.is_lost() {
390 return Err(RmuxError::from(
391 rmux_proto::RmuxError::owned_session_lease_lost(self.session_name.clone()),
392 ));
393 }
394
395 let response = self
396 .transport
397 .request(Request::ReleaseSessionLease(ReleaseSessionLeaseRequest {
398 session_name: self.session_name.clone(),
399 token: self.token,
400 }))
401 .await?;
402 let Response::ReleaseSessionLease(response) = response else {
403 return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(
404 "daemon returned unexpected response for session lease release".to_owned(),
405 )));
406 };
407 if response.released {
408 let _ = self.state_tx.send(LeaseState::NotLeased);
409 Ok(true)
410 } else {
411 self.mark_lost();
412 Err(RmuxError::from(
413 rmux_proto::RmuxError::owned_session_lease_lost(self.session_name.clone()),
414 ))
415 }
416 }
417}
418
419async fn renew_lease_with_retries(
420 transport: &TransportClient,
421 session_name: &SessionName,
422 token: u64,
423 ttl_millis: u64,
424 deadline: tokio::time::Instant,
425) -> bool {
426 let mut delay = MIN_LEASE_RENEW_INTERVAL;
427
428 loop {
429 match renew_lease_once(transport, session_name, token, ttl_millis).await {
430 Ok(true) => return true,
431 Ok(false) => return false,
432 Err(_) => {
433 let now = tokio::time::Instant::now();
434 if now >= deadline {
435 return false;
436 }
437 let remaining = deadline - now;
438 tokio::time::sleep(delay.min(remaining)).await;
439 delay = delay
440 .saturating_add(delay)
441 .min(MAX_LEASE_RENEW_RETRY_INTERVAL);
442 }
443 }
444 }
445}
446
447async fn renew_lease_once(
448 transport: &TransportClient,
449 session_name: &SessionName,
450 token: u64,
451 ttl_millis: u64,
452) -> Result<bool> {
453 match transport
454 .request(Request::RenewSessionLease(RenewSessionLeaseRequest {
455 session_name: session_name.clone(),
456 token,
457 ttl_millis,
458 }))
459 .await?
460 {
461 Response::RenewSessionLease(response) => Ok(response.renewed),
462 response => Err(RmuxError::protocol(rmux_proto::RmuxError::Server(format!(
463 "daemon returned `{}` response for session lease renew",
464 response.command_name()
465 )))),
466 }
467}
468
469impl Drop for OwnedSessionLease {
470 fn drop(&mut self) {
471 self.task.abort();
472 }
473}
474
475impl Deref for OwnedSession {
476 type Target = Session;
477
478 fn deref(&self) -> &Self::Target {
479 self.session()
480 }
481}
482
483impl Drop for OwnedSession {
484 fn drop(&mut self) {
485 if !matches!(
486 self.cleanup_policy,
487 CleanupPolicy::KillOnDrop | CleanupPolicy::KillOnOwnerExit
488 ) {
489 return;
490 }
491 let Some(session) = self.session.as_ref() else {
492 return;
493 };
494 let guard = DropGuard::best_effort(
495 session.transport().clone(),
496 Request::KillSession(KillSessionRequest {
497 target: session.name().clone(),
498 kill_all_except_target: false,
499 clear_alerts: false,
500 }),
501 );
502 drop(guard);
503 }
504}
505
506fn ttl_millis(ttl: Duration) -> Result<u64> {
507 validate_lease_ttl(ttl)?;
508 let millis = u64::try_from(ttl.as_millis()).map_err(|_| {
509 RmuxError::protocol(rmux_proto::RmuxError::Server(
510 "owned session lease ttl is too large".to_owned(),
511 ))
512 })?;
513 Ok(millis)
514}
515
516fn validate_lease_ttl(ttl: Duration) -> Result<()> {
517 let millis = u64::try_from(ttl.as_millis()).map_err(|_| {
518 RmuxError::protocol(rmux_proto::RmuxError::Server(
519 "owned session lease ttl is too large".to_owned(),
520 ))
521 })?;
522 if millis == 0 {
523 return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(
524 "owned session lease ttl must be greater than zero".to_owned(),
525 )));
526 }
527 if millis < rmux_proto::MIN_SESSION_LEASE_TTL_MILLIS {
528 return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(format!(
529 "owned session lease ttl must be at least {}ms",
530 rmux_proto::MIN_SESSION_LEASE_TTL_MILLIS
531 ))));
532 }
533 Ok(())
534}
535
536fn is_missing_session(error: &RmuxError) -> bool {
537 matches!(
538 error,
539 RmuxError::Protocol {
540 source: rmux_proto::RmuxError::SessionNotFound(_),
541 }
542 )
543}