Skip to main content

slim_session/
completion_handle.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    future::Future,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use tokio::sync::oneshot;
11
12use crate::SessionError;
13
14/// The inner future type that can be awaited for completion.
15#[derive(Debug)]
16enum CompletionFuture {
17    /// A oneshot receiver for completion acknowledgments
18    OneshotReceiver(oneshot::Receiver<Result<(), SessionError>>),
19    /// A join handle for task-based completions
20    JoinHandle(tokio::task::JoinHandle<()>),
21}
22
23/// A handle to await the completion of an asynchronous operation.
24///
25/// This type wraps an internal future and can be directly awaited
26/// to check if an operation completed successfully. It is used for:
27/// - Message delivery acknowledgments
28/// - Session initialization (e.g., P2P handshakes)
29/// - Participant invitations
30/// - Participant removals
31///
32/// # Examples
33///
34/// Basic usage with oneshot receiver:
35/// ```ignore
36/// let ack = session.publish(...).await?;
37/// ack.await?; // Wait for delivery confirmation
38/// ```
39///
40/// Usage with join handle:
41/// ```ignore
42/// let handle = tokio::spawn(async {
43///     // Some async work
44///     Ok(())
45/// });
46/// let completion = CompletionHandle::from_join_handle(handle);
47/// completion.await?;
48/// ```
49#[derive(Debug)]
50pub struct CompletionHandle {
51    inner: CompletionFuture,
52}
53
54impl CompletionHandle {
55    /// Create a new completion handle from a oneshot receiver.
56    ///
57    /// This is the most common constructor, used for operations that use
58    /// a oneshot channel to signal completion.
59    pub fn from_oneshot_receiver(receiver: oneshot::Receiver<Result<(), SessionError>>) -> Self {
60        Self {
61            inner: CompletionFuture::OneshotReceiver(receiver),
62        }
63    }
64
65    /// Create a new completion handle from a join handle.
66    ///
67    /// This is used for operations that spawn a task and need to await
68    /// its completion.
69    pub fn from_join_handle(handle: tokio::task::JoinHandle<()>) -> Self {
70        Self {
71            inner: CompletionFuture::JoinHandle(handle),
72        }
73    }
74}
75
76impl Future for CompletionHandle {
77    type Output = Result<(), SessionError>;
78
79    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80        let this = self.get_mut();
81
82        match &mut this.inner {
83            CompletionFuture::OneshotReceiver(receiver) => match Pin::new(receiver).poll(cx) {
84                Poll::Ready(Ok(result)) => Poll::Ready(result),
85                Poll::Ready(Err(e)) => Poll::Ready(Err(SessionError::AckReception(e.to_string()))),
86                Poll::Pending => Poll::Pending,
87            },
88            CompletionFuture::JoinHandle(handle) => match Pin::new(handle).poll(cx) {
89                Poll::Ready(Ok(result)) => Poll::Ready(Ok(result)),
90                Poll::Ready(Err(e)) => Poll::Ready(Err(SessionError::AckReception(format!(
91                    "Join handle error: {}",
92                    e
93                )))),
94                Poll::Pending => Poll::Pending,
95            },
96        }
97    }
98}