ferrite_session/internal/session/shared/
acquire.rs

1use tokio::task;
2
3use crate::internal::{
4  base::*,
5  functional::*,
6  protocol::{
7    End,
8    LinearToShared,
9    SendValue,
10    SharedToLinear,
11  },
12};
13
14pub fn async_acquire_shared_session<F>(
15  shared: SharedChannel<LinearToShared<F>>,
16  cont1: impl FnOnce(Z) -> PartialSession<(F::Applied, ()), End> + Send + 'static,
17) -> task::JoinHandle<()>
18where
19  F: Protocol,
20  F: SharedRecApp<SharedToLinear<LinearToShared<F>>>,
21  F::Applied: Protocol,
22{
23  debug!("[async_acquire_shared_session] acquiring shared session");
24
25  let (receiver3, receiver4) = unsafe_receive_shared_channel(shared);
26
27  task::spawn(async move {
28    let (provider_end_1, client_end_1) = End::create_endpoints();
29
30    let LinearToShared { linear } = receiver4.recv().await.unwrap();
31
32    let client_end_2 = linear.get_applied();
33
34    let cont2 = cont1(Z);
35
36    let ctx = (App::new(client_end_2), ());
37
38    unsafe_run_session(cont2, ctx, provider_end_1).await;
39
40    client_end_1.recv().await.unwrap();
41
42    receiver3.recv().await.unwrap();
43  })
44}
45
46pub fn async_acquire_shared_session_with_result<T, F>(
47  shared: SharedChannel<LinearToShared<F>>,
48  cont1: impl FnOnce(Z) -> PartialSession<(F::Applied, ()), SendValue<T, End>>
49    + Send
50    + 'static,
51) -> task::JoinHandle<T>
52where
53  F: Protocol,
54  T: Send + 'static,
55  F: SharedRecApp<SharedToLinear<LinearToShared<F>>>,
56  F::Applied: Protocol,
57{
58  debug!("[async_acquire_shared_session_with_result] acquiring shared session");
59
60  let (receiver3, receiver4) = unsafe_receive_shared_channel(shared);
61
62  task::spawn(async move {
63    let (provider_end_1, client_end_1) =
64      <SendValue<T, End>>::create_endpoints();
65
66    let LinearToShared { linear } = receiver4.recv().await.unwrap();
67
68    let client_end_2 = linear.get_applied();
69
70    let cont2 = cont1(Z);
71
72    let ctx = (App::new(client_end_2), ());
73
74    unsafe_run_session(cont2, ctx, provider_end_1).await;
75
76    receiver3.recv().await.unwrap();
77
78    let (Value(val), end_receiver) = client_end_1.recv().await.unwrap();
79
80    end_receiver.recv().await.unwrap();
81
82    val
83  })
84}
85
86pub fn acquire_shared_session<C, F, A>(
87  shared: SharedChannel<LinearToShared<F>>,
88  cont1: impl FnOnce(C::Length) -> PartialSession<C::Appended, A> + Send + 'static,
89) -> PartialSession<C, A>
90where
91  C: Context,
92  F: Protocol,
93  A: Protocol,
94  F::Applied: Protocol,
95  F: SharedRecApp<SharedToLinear<LinearToShared<F>>>,
96  C: AppendContext<(F::Applied, ())>,
97{
98  unsafe_create_session(move |ctx1, provider_end_1| async move {
99    let cont2 = cont1(C::Length::nat());
100
101    let (receiver3, receiver4) = unsafe_receive_shared_channel(shared);
102
103    debug!("[acquire_shared_session] acquiring shared endpoint");
104
105    receiver3.recv().await.unwrap();
106
107    debug!("[acquire_shared_session] acquired shared endpoint");
108
109    let LinearToShared { linear } = receiver4.recv().await.unwrap();
110
111    let client_end_2 = linear.get_applied();
112
113    let ctx2 = C::append_context(ctx1, (App::new(client_end_2), ()));
114
115    unsafe_run_session(cont2, ctx2, provider_end_1).await;
116  })
117}