ferrite_session/internal/session/shared/
acquire.rs1use tokio::task;
2
3use crate::internal::{
4 base::*,
5 functional::*,
6 protocol::{
7 End,
8 LinearToShared,
9 SendValue,
10 SharedToLinear,
11 },
12};
13
14pub fn async_acquire_shared_session<F>(
15 shared: SharedChannel<LinearToShared<F>>,
16 cont1: impl FnOnce(Z) -> PartialSession<(F::Applied, ()), End> + Send + 'static,
17) -> task::JoinHandle<()>
18where
19 F: Protocol,
20 F: SharedRecApp<SharedToLinear<LinearToShared<F>>>,
21 F::Applied: Protocol,
22{
23 debug!("[async_acquire_shared_session] acquiring shared session");
24
25 let (receiver3, receiver4) = unsafe_receive_shared_channel(shared);
26
27 task::spawn(async move {
28 let (provider_end_1, client_end_1) = End::create_endpoints();
29
30 let LinearToShared { linear } = receiver4.recv().await.unwrap();
31
32 let client_end_2 = linear.get_applied();
33
34 let cont2 = cont1(Z);
35
36 let ctx = (App::new(client_end_2), ());
37
38 unsafe_run_session(cont2, ctx, provider_end_1).await;
39
40 client_end_1.recv().await.unwrap();
41
42 receiver3.recv().await.unwrap();
43 })
44}
45
46pub fn async_acquire_shared_session_with_result<T, F>(
47 shared: SharedChannel<LinearToShared<F>>,
48 cont1: impl FnOnce(Z) -> PartialSession<(F::Applied, ()), SendValue<T, End>>
49 + Send
50 + 'static,
51) -> task::JoinHandle<T>
52where
53 F: Protocol,
54 T: Send + 'static,
55 F: SharedRecApp<SharedToLinear<LinearToShared<F>>>,
56 F::Applied: Protocol,
57{
58 debug!("[async_acquire_shared_session_with_result] acquiring shared session");
59
60 let (receiver3, receiver4) = unsafe_receive_shared_channel(shared);
61
62 task::spawn(async move {
63 let (provider_end_1, client_end_1) =
64 <SendValue<T, End>>::create_endpoints();
65
66 let LinearToShared { linear } = receiver4.recv().await.unwrap();
67
68 let client_end_2 = linear.get_applied();
69
70 let cont2 = cont1(Z);
71
72 let ctx = (App::new(client_end_2), ());
73
74 unsafe_run_session(cont2, ctx, provider_end_1).await;
75
76 receiver3.recv().await.unwrap();
77
78 let (Value(val), end_receiver) = client_end_1.recv().await.unwrap();
79
80 end_receiver.recv().await.unwrap();
81
82 val
83 })
84}
85
86pub fn acquire_shared_session<C, F, A>(
87 shared: SharedChannel<LinearToShared<F>>,
88 cont1: impl FnOnce(C::Length) -> PartialSession<C::Appended, A> + Send + 'static,
89) -> PartialSession<C, A>
90where
91 C: Context,
92 F: Protocol,
93 A: Protocol,
94 F::Applied: Protocol,
95 F: SharedRecApp<SharedToLinear<LinearToShared<F>>>,
96 C: AppendContext<(F::Applied, ())>,
97{
98 unsafe_create_session(move |ctx1, provider_end_1| async move {
99 let cont2 = cont1(C::Length::nat());
100
101 let (receiver3, receiver4) = unsafe_receive_shared_channel(shared);
102
103 debug!("[acquire_shared_session] acquiring shared endpoint");
104
105 receiver3.recv().await.unwrap();
106
107 debug!("[acquire_shared_session] acquired shared endpoint");
108
109 let LinearToShared { linear } = receiver4.recv().await.unwrap();
110
111 let client_end_2 = linear.get_applied();
112
113 let ctx2 = C::append_context(ctx1, (App::new(client_end_2), ()));
114
115 unsafe_run_session(cont2, ctx2, provider_end_1).await;
116 })
117}