Skip to main content

rmux_sdk/handles/
owned_session.rs

1//! App-owned session guard.
2
3mod signals;
4
5use std::future::{Future, IntoFuture};
6use std::ops::Deref;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::watch;
13use tokio::task::JoinHandle;
14
15use crate::transport::{DropGuard, TransportClient};
16use crate::{EnsureSession, Result, RmuxError, Session, SessionName};
17use rmux_proto::{
18    CreateSessionLeaseRequest, KillSessionRequest, ReleaseSessionLeaseRequest,
19    RenewSessionLeaseRequest, Request, Response, CAPABILITY_SDK_SESSION_LEASE,
20};
21
22use super::Rmux;
23pub use signals::OwnedSessionSignalHandlers;
24
25const DEFAULT_LEASE_TTL: Duration = Duration::from_secs(5);
26const MIN_LEASE_RENEW_INTERVAL: Duration = Duration::from_millis(100);
27const MAX_LEASE_RENEW_RETRY_INTERVAL: Duration = Duration::from_millis(250);
28
29/// Cleanup policy for an [`OwnedSession`].
30#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
31#[non_exhaustive]
32pub enum CleanupPolicy {
33    /// Kill the session on explicit cleanup and best-effort Drop.
34    #[default]
35    KillOnDrop,
36    /// Kill the session if the owner stops renewing its daemon-side lease.
37    KillOnOwnerExit,
38    /// Keep the session alive when the owner is dropped.
39    Preserve,
40}
41
42/// Observable daemon lease state for an [`OwnedSession`].
43#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
44#[non_exhaustive]
45pub enum LeaseState {
46    /// The owned session was not created with a daemon-side lease, or the
47    /// lease has been released successfully.
48    #[default]
49    NotLeased,
50    /// The daemon-side lease is active and the SDK heartbeat is renewing it.
51    Active,
52    /// The SDK heartbeat observed a terminal lease renewal failure.
53    Lost,
54}
55
56/// Builder returned by [`Rmux::owned_session`].
57#[derive(Debug)]
58pub struct OwnedSessionBuilder<'a> {
59    rmux: &'a Rmux,
60    name: SessionName,
61    replace_existing: bool,
62    cleanup_policy: CleanupPolicy,
63    lease_ttl: Duration,
64}
65
66impl<'a> OwnedSessionBuilder<'a> {
67    pub(crate) const fn new(rmux: &'a Rmux, name: SessionName) -> Self {
68        Self {
69            rmux,
70            name,
71            replace_existing: false,
72            cleanup_policy: CleanupPolicy::KillOnDrop,
73            lease_ttl: DEFAULT_LEASE_TTL,
74        }
75    }
76
77    /// Kills an existing session with the same name before creating the new
78    /// owned session.
79    #[must_use]
80    pub const fn replace_existing(mut self, replace_existing: bool) -> Self {
81        self.replace_existing = replace_existing;
82        self
83    }
84
85    /// Sets the cleanup policy for the owned session.
86    #[must_use]
87    pub const fn cleanup_policy(mut self, cleanup_policy: CleanupPolicy) -> Self {
88        self.cleanup_policy = cleanup_policy;
89        self
90    }
91
92    /// Sets the heartbeat lease TTL used by
93    /// [`CleanupPolicy::KillOnOwnerExit`].
94    #[must_use]
95    pub const fn lease_ttl(mut self, ttl: Duration) -> Self {
96        self.lease_ttl = ttl;
97        self
98    }
99
100    async fn run(self) -> Result<OwnedSession> {
101        if self.cleanup_policy == CleanupPolicy::KillOnOwnerExit {
102            validate_lease_ttl(self.lease_ttl)?;
103        }
104
105        if self.replace_existing {
106            match self.rmux.session(self.name.clone()).await {
107                Ok(session) => {
108                    let _ = session.kill().await?;
109                }
110                Err(error) if is_missing_session(&error) => {}
111                Err(error) => return Err(error),
112            }
113        }
114
115        let session = self
116            .rmux
117            .ensure_session(EnsureSession::named(self.name).create_only().detached(true))
118            .await?;
119        let lease = if self.cleanup_policy == CleanupPolicy::KillOnOwnerExit {
120            Some(OwnedSessionLease::start(&session, self.lease_ttl).await?)
121        } else {
122            None
123        };
124        Ok(OwnedSession {
125            session: Some(session),
126            cleanup_policy: self.cleanup_policy,
127            lease,
128            signal_handlers_installed: Arc::new(AtomicBool::new(false)),
129        })
130    }
131}
132
133impl<'a> IntoFuture for OwnedSessionBuilder<'a> {
134    type Output = Result<OwnedSession>;
135    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
136
137    fn into_future(self) -> Self::IntoFuture {
138        Box::pin(self.run())
139    }
140}
141
142/// A session whose lifetime is owned by the SDK caller.
143#[derive(Debug)]
144pub struct OwnedSession {
145    session: Option<Session>,
146    cleanup_policy: CleanupPolicy,
147    lease: Option<OwnedSessionLease>,
148    signal_handlers_installed: Arc<AtomicBool>,
149}
150
151impl OwnedSession {
152    /// Returns the configured cleanup policy.
153    #[must_use]
154    pub const fn cleanup_policy(&self) -> CleanupPolicy {
155        self.cleanup_policy
156    }
157
158    /// Returns true while this owner still contains a live session handle.
159    ///
160    /// This becomes false after a successful [`Self::cleanup`] or after
161    /// [`Self::detach_owned`] consumes the owner.
162    #[must_use]
163    pub const fn is_active(&self) -> bool {
164        self.session.is_some()
165    }
166
167    /// Returns true once the daemon-side owner lease renewal task has observed
168    /// a terminal lease loss.
169    ///
170    /// This is only meaningful for [`CleanupPolicy::KillOnOwnerExit`]. A true
171    /// value means the daemon may reap the session after the configured TTL.
172    #[must_use]
173    pub fn lease_lost(&self) -> bool {
174        self.lease.as_ref().is_some_and(OwnedSessionLease::is_lost)
175    }
176
177    /// Returns the current daemon-side lease state.
178    #[must_use]
179    pub fn lease_state(&self) -> LeaseState {
180        self.lease
181            .as_ref()
182            .map_or(LeaseState::NotLeased, OwnedSessionLease::state)
183    }
184
185    /// Subscribes to daemon-side lease state changes.
186    ///
187    /// Returns `None` for sessions that were not created with
188    /// [`CleanupPolicy::KillOnOwnerExit`].
189    #[must_use]
190    pub fn lease_state_receiver(&self) -> Option<watch::Receiver<LeaseState>> {
191        self.lease.as_ref().map(OwnedSessionLease::subscribe)
192    }
193
194    /// Explicitly kills the owned session when the policy is not
195    /// [`CleanupPolicy::Preserve`].
196    pub async fn cleanup(&mut self) -> Result<bool> {
197        let Some(session) = self.session.as_ref() else {
198            return Ok(false);
199        };
200        match self.cleanup_policy {
201            CleanupPolicy::KillOnDrop | CleanupPolicy::KillOnOwnerExit => {
202                let killed = session.kill().await?;
203                self.session.take();
204                if let Some(lease) = self.lease.as_ref() {
205                    lease.mark_not_leased();
206                }
207                self.lease.take();
208                Ok(killed)
209            }
210            CleanupPolicy::Preserve => Ok(false),
211        }
212    }
213
214    /// Immediately runs the same cleanup path as [`Self::cleanup`].
215    ///
216    /// This is a naming convenience for apps that already own their signal or
217    /// cancellation handling and want an explicit shutdown hook.
218    pub async fn shutdown_now(&mut self) -> Result<bool> {
219        self.cleanup().await
220    }
221
222    /// Installs opt-in process signal handling for this owned session.
223    ///
224    /// The SDK never installs signal handlers by default. This helper listens
225    /// for Ctrl-C on every platform, and for SIGTERM/SIGHUP on Unix, then asks
226    /// the daemon to kill the session. Dropping the returned guard aborts the
227    /// background listener. Only one guard may be installed at a time; a second
228    /// call returns an error until the first guard is dropped.
229    pub fn install_default_signal_handlers(&self) -> Result<OwnedSessionSignalHandlers> {
230        let Some(session) = self.session.as_ref() else {
231            return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(
232                "owned session no longer active".to_owned(),
233            )));
234        };
235        if self
236            .signal_handlers_installed
237            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
238            .is_err()
239        {
240            return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(
241                "owned session signal handlers are already installed".to_owned(),
242            )));
243        }
244
245        let transport = session.transport().clone();
246        let target = session.name().clone();
247        let installed = Arc::clone(&self.signal_handlers_installed);
248        signals::install_default_signal_handlers(transport, target, installed)
249    }
250
251    /// Switches this owner to preserve mode after confirming lease release.
252    pub async fn preserve(mut self) -> Result<Self> {
253        self.release_lease_confirmed().await?;
254        self.cleanup_policy = CleanupPolicy::Preserve;
255        Ok(self)
256    }
257
258    /// Detaches the guard and returns the underlying persistent session.
259    pub async fn detach_owned(mut self) -> Result<Session> {
260        self.release_lease_confirmed().await?;
261        self.cleanup_policy = CleanupPolicy::Preserve;
262        Ok(self
263            .session
264            .take()
265            .expect("owned session must contain a session until detached"))
266    }
267
268    /// Returns the underlying session handle if the owner still has one.
269    #[must_use]
270    pub fn try_session(&self) -> Option<&Session> {
271        self.session.as_ref()
272    }
273
274    /// Returns the underlying session handle.
275    ///
276    /// Panics after successful [`Self::cleanup`] because there is no longer an
277    /// owned session handle. Use [`Self::try_session`] or [`Self::is_active`]
278    /// when the owner may have been cleaned up already.
279    #[must_use]
280    pub fn session(&self) -> &Session {
281        self.session
282            .as_ref()
283            .expect("owned session no longer contains a session")
284    }
285
286    async fn release_lease_confirmed(&mut self) -> Result<()> {
287        if let Some(lease) = self.lease.as_ref() {
288            lease.release_confirmed().await?;
289        }
290        self.lease.take();
291        Ok(())
292    }
293}
294
295#[derive(Debug)]
296struct OwnedSessionLease {
297    session_name: SessionName,
298    token: u64,
299    transport: TransportClient,
300    task: JoinHandle<()>,
301    lost: Arc<AtomicBool>,
302    state_tx: watch::Sender<LeaseState>,
303}
304
305impl OwnedSessionLease {
306    async fn start(session: &Session, ttl: Duration) -> Result<Self> {
307        let ttl_millis = ttl_millis(ttl)?;
308        let transport = session.transport().clone();
309        crate::capabilities::require(&transport, &[CAPABILITY_SDK_SESSION_LEASE]).await?;
310        let response = transport
311            .request(Request::CreateSessionLease(CreateSessionLeaseRequest {
312                session_name: session.name().clone(),
313                ttl_millis,
314            }))
315            .await?;
316        let Response::CreateSessionLease(response) = response else {
317            return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(
318                "daemon returned unexpected response for session lease create".to_owned(),
319            )));
320        };
321
322        let session_name = session.name().clone();
323        let token = response.token;
324        let renew_transport = transport.clone();
325        let renew_session_name = session_name.clone();
326        let lost = Arc::new(AtomicBool::new(false));
327        let renew_lost = Arc::clone(&lost);
328        let (state_tx, _) = watch::channel(LeaseState::Active);
329        let renew_state_tx = state_tx.clone();
330        let renew_interval = (ttl / 3).max(MIN_LEASE_RENEW_INTERVAL);
331        let task = tokio::spawn(async move {
332            let mut last_renew_success = tokio::time::Instant::now();
333            loop {
334                tokio::time::sleep(renew_interval).await;
335                let deadline = last_renew_success + ttl;
336                if !renew_lease_with_retries(
337                    &renew_transport,
338                    &renew_session_name,
339                    token,
340                    ttl_millis,
341                    deadline,
342                )
343                .await
344                {
345                    renew_lost.store(true, Ordering::Release);
346                    let _ = renew_state_tx.send(LeaseState::Lost);
347                    break;
348                }
349                last_renew_success = tokio::time::Instant::now();
350            }
351        });
352
353        Ok(Self {
354            session_name,
355            token,
356            transport,
357            task,
358            lost,
359            state_tx,
360        })
361    }
362
363    fn is_lost(&self) -> bool {
364        self.lost.load(Ordering::Acquire)
365    }
366
367    fn state(&self) -> LeaseState {
368        if self.is_lost() {
369            LeaseState::Lost
370        } else {
371            *self.state_tx.borrow()
372        }
373    }
374
375    fn subscribe(&self) -> watch::Receiver<LeaseState> {
376        self.state_tx.subscribe()
377    }
378
379    fn mark_not_leased(&self) {
380        let _ = self.state_tx.send(LeaseState::NotLeased);
381    }
382
383    fn mark_lost(&self) {
384        self.lost.store(true, Ordering::Release);
385        let _ = self.state_tx.send(LeaseState::Lost);
386    }
387
388    async fn release_confirmed(&self) -> Result<bool> {
389        if self.is_lost() {
390            return Err(RmuxError::from(
391                rmux_proto::RmuxError::owned_session_lease_lost(self.session_name.clone()),
392            ));
393        }
394
395        let response = self
396            .transport
397            .request(Request::ReleaseSessionLease(ReleaseSessionLeaseRequest {
398                session_name: self.session_name.clone(),
399                token: self.token,
400            }))
401            .await?;
402        let Response::ReleaseSessionLease(response) = response else {
403            return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(
404                "daemon returned unexpected response for session lease release".to_owned(),
405            )));
406        };
407        if response.released {
408            let _ = self.state_tx.send(LeaseState::NotLeased);
409            Ok(true)
410        } else {
411            self.mark_lost();
412            Err(RmuxError::from(
413                rmux_proto::RmuxError::owned_session_lease_lost(self.session_name.clone()),
414            ))
415        }
416    }
417}
418
419async fn renew_lease_with_retries(
420    transport: &TransportClient,
421    session_name: &SessionName,
422    token: u64,
423    ttl_millis: u64,
424    deadline: tokio::time::Instant,
425) -> bool {
426    let mut delay = MIN_LEASE_RENEW_INTERVAL;
427
428    loop {
429        match renew_lease_once(transport, session_name, token, ttl_millis).await {
430            Ok(true) => return true,
431            Ok(false) => return false,
432            Err(_) => {
433                let now = tokio::time::Instant::now();
434                if now >= deadline {
435                    return false;
436                }
437                let remaining = deadline - now;
438                tokio::time::sleep(delay.min(remaining)).await;
439                delay = delay
440                    .saturating_add(delay)
441                    .min(MAX_LEASE_RENEW_RETRY_INTERVAL);
442            }
443        }
444    }
445}
446
447async fn renew_lease_once(
448    transport: &TransportClient,
449    session_name: &SessionName,
450    token: u64,
451    ttl_millis: u64,
452) -> Result<bool> {
453    match transport
454        .request(Request::RenewSessionLease(RenewSessionLeaseRequest {
455            session_name: session_name.clone(),
456            token,
457            ttl_millis,
458        }))
459        .await?
460    {
461        Response::RenewSessionLease(response) => Ok(response.renewed),
462        response => Err(RmuxError::protocol(rmux_proto::RmuxError::Server(format!(
463            "daemon returned `{}` response for session lease renew",
464            response.command_name()
465        )))),
466    }
467}
468
469impl Drop for OwnedSessionLease {
470    fn drop(&mut self) {
471        self.task.abort();
472    }
473}
474
475impl Deref for OwnedSession {
476    type Target = Session;
477
478    fn deref(&self) -> &Self::Target {
479        self.session()
480    }
481}
482
483impl Drop for OwnedSession {
484    fn drop(&mut self) {
485        if !matches!(
486            self.cleanup_policy,
487            CleanupPolicy::KillOnDrop | CleanupPolicy::KillOnOwnerExit
488        ) {
489            return;
490        }
491        let Some(session) = self.session.as_ref() else {
492            return;
493        };
494        let guard = DropGuard::best_effort(
495            session.transport().clone(),
496            Request::KillSession(KillSessionRequest {
497                target: session.name().clone(),
498                kill_all_except_target: false,
499                clear_alerts: false,
500            }),
501        );
502        drop(guard);
503    }
504}
505
506fn ttl_millis(ttl: Duration) -> Result<u64> {
507    validate_lease_ttl(ttl)?;
508    let millis = u64::try_from(ttl.as_millis()).map_err(|_| {
509        RmuxError::protocol(rmux_proto::RmuxError::Server(
510            "owned session lease ttl is too large".to_owned(),
511        ))
512    })?;
513    Ok(millis)
514}
515
516fn validate_lease_ttl(ttl: Duration) -> Result<()> {
517    let millis = u64::try_from(ttl.as_millis()).map_err(|_| {
518        RmuxError::protocol(rmux_proto::RmuxError::Server(
519            "owned session lease ttl is too large".to_owned(),
520        ))
521    })?;
522    if millis == 0 {
523        return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(
524            "owned session lease ttl must be greater than zero".to_owned(),
525        )));
526    }
527    if millis < rmux_proto::MIN_SESSION_LEASE_TTL_MILLIS {
528        return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(format!(
529            "owned session lease ttl must be at least {}ms",
530            rmux_proto::MIN_SESSION_LEASE_TTL_MILLIS
531        ))));
532    }
533    Ok(())
534}
535
536fn is_missing_session(error: &RmuxError) -> bool {
537    matches!(
538        error,
539        RmuxError::Protocol {
540            source: rmux_proto::RmuxError::SessionNotFound(_),
541        }
542    )
543}