slim_session/completion_handle.rs
1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5 future::Future,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use tokio::sync::oneshot;
11
12use crate::SessionError;
13
14/// The inner future type that can be awaited for completion.
15#[derive(Debug)]
16enum CompletionFuture {
17 /// A oneshot receiver for completion acknowledgments
18 OneshotReceiver(oneshot::Receiver<Result<(), SessionError>>),
19 /// A join handle for task-based completions
20 JoinHandle(tokio::task::JoinHandle<()>),
21}
22
23/// A handle to await the completion of an asynchronous operation.
24///
25/// This type wraps an internal future and can be directly awaited
26/// to check if an operation completed successfully. It is used for:
27/// - Message delivery acknowledgments
28/// - Session initialization (e.g., P2P handshakes)
29/// - Participant invitations
30/// - Participant removals
31///
32/// # Examples
33///
34/// Basic usage with oneshot receiver:
35/// ```ignore
36/// let ack = session.publish(...).await?;
37/// ack.await?; // Wait for delivery confirmation
38/// ```
39///
40/// Usage with join handle:
41/// ```ignore
42/// let handle = tokio::spawn(async {
43/// // Some async work
44/// Ok(())
45/// });
46/// let completion = CompletionHandle::from_join_handle(handle);
47/// completion.await?;
48/// ```
49#[derive(Debug)]
50pub struct CompletionHandle {
51 inner: CompletionFuture,
52}
53
54impl CompletionHandle {
55 /// Create a new completion handle from a oneshot receiver.
56 ///
57 /// This is the most common constructor, used for operations that use
58 /// a oneshot channel to signal completion.
59 pub fn from_oneshot_receiver(receiver: oneshot::Receiver<Result<(), SessionError>>) -> Self {
60 Self {
61 inner: CompletionFuture::OneshotReceiver(receiver),
62 }
63 }
64
65 /// Create a new completion handle from a join handle.
66 ///
67 /// This is used for operations that spawn a task and need to await
68 /// its completion.
69 pub fn from_join_handle(handle: tokio::task::JoinHandle<()>) -> Self {
70 Self {
71 inner: CompletionFuture::JoinHandle(handle),
72 }
73 }
74}
75
76impl Future for CompletionHandle {
77 type Output = Result<(), SessionError>;
78
79 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80 let this = self.get_mut();
81
82 match &mut this.inner {
83 CompletionFuture::OneshotReceiver(receiver) => match Pin::new(receiver).poll(cx) {
84 Poll::Ready(Ok(result)) => Poll::Ready(result),
85 Poll::Ready(Err(e)) => Poll::Ready(Err(SessionError::AckReception(e.to_string()))),
86 Poll::Pending => Poll::Pending,
87 },
88 CompletionFuture::JoinHandle(handle) => match Pin::new(handle).poll(cx) {
89 Poll::Ready(Ok(result)) => Poll::Ready(Ok(result)),
90 Poll::Ready(Err(e)) => Poll::Ready(Err(SessionError::AckReception(format!(
91 "Join handle error: {}",
92 e
93 )))),
94 Poll::Pending => Poll::Pending,
95 },
96 }
97 }
98}