sync_tokens/lib.rs
1// https://github.com/GWBasic/sync-tokens
2// (c) Andrew Rondeau
3// Apache 2.0 license
4// See https://github.com/GWBasic/sync-tokens/blob/main/LICENSE
5
6//! sync-tokens provides ways to coordinate with running tasks. It
7//! provides a way to cleanly cancel a running task, and a way for
8//! a running task to communicate when it's ready
9//!
10//! ```toml
11//! # Example, use the version numbers you need
12//! sync-tokens = "0.1.0"
13//! async-std = { version = "1.7.0", features = ["attributes"] }
14//!```
15//!
16//! # Examples
17//!
18//! Accepts incoming sockets on a background task. Communicates when
19//! the listener is actively listening, and allows canceling the loop
20//! for incoming sockets
21//!
22//! [See on github](https://github.com/GWBasic/sync-tokens-example)
23//!
24//! ```no_run
25//! use std::io::{ Error, ErrorKind };
26//!
27//! use async_std::io::Result;
28//! use async_std::net::{IpAddr, Ipv4Addr, TcpListener, TcpStream, SocketAddr};
29//! use async_std::task;
30//! use async_std::task::JoinHandle;
31//!
32//! use sync_tokens::cancelation_token::{ Cancelable, CancelationToken };
33//! use sync_tokens::completion_token::{ Completable, CompletionToken };
34//!
35//! // Starts running a server on a background task
36//! pub fn run_server() -> (JoinHandle<Result<()>>, CompletionToken<Result<SocketAddr>>, CancelationToken) {
37//! // This CompletionToken allows the caller to wait until the server is actually listening
38//! // The caller gets completion_token, which it can await on
39//! // completable is used to signal to completion_token
40//! let (completion_token, completable) = CompletionToken::new();
41//!
42//! // This CancelationToken allows the caller to stop the server
43//! // The caller gets cancelation_token
44//! // cancelable is used to allow canceling a call to await
45//! let (cancelation_token, cancelable) = CancelationToken::new();
46//!
47//! // The server is started on a background task, and the future returned
48//! let server_future = task::spawn(run_server_int(completable, cancelable));
49//!
50//! (server_future, completion_token, cancelation_token)
51//! }
52//!
53//! async fn run_server_int(completable: Completable<Result<SocketAddr>>, cancelable: Cancelable) -> Result<()> {
54//!
55//! let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
56//! let listener = TcpListener::bind(socket_addr).await?;
57//!
58//! // Inform that the server is listening
59//! let local_addr = listener.local_addr();
60//! completable.complete(local_addr);
61//!
62//! // Create a future that waits for an incoming socket
63//! let mut incoming_future = task::spawn(accept(listener));
64//!
65//! loop {
66//! // Wait for either the incoming socket (via incoming_future) or for the CancelationToken
67//! // to be canceled.
68//! // When the CancelationToken is canceled, the error is returned
69//! let (listener, _) = cancelable.allow_cancel(
70//! incoming_future,
71//! Err(Error::new(ErrorKind::Interrupted, "Server terminated")))
72//! .await?;
73//!
74//! incoming_future = task::spawn(accept(listener));
75//! }
76//! }
77//!
78//! async fn accept(listener: TcpListener) -> Result<(TcpListener, TcpStream)> {
79//! let (stream, _) = listener.accept().await?;
80//! Ok((listener, stream))
81//! }
82//!
83//! #[async_std::main]
84//! async fn main() {
85//! let (server_future, completion_token, cancelation_token) = run_server();
86//!
87//! println!("Server is starting");
88//!
89//! // Wait for the server to start
90//! let local_addr = completion_token.await.unwrap();
91//!
92//! println!("Server is listening at {}", local_addr);
93//! println!("Push Return to stop the server");
94//!
95//! let _ = std::io::stdin().read_line(&mut String::new()).unwrap();
96//!
97//! // Stop the server
98//! cancelation_token.cancel();
99//!
100//! // Wait for the server to shut down
101//! let err = server_future.await.unwrap_err();
102//!
103//! println!("Server ended: {}", err);
104//! }
105//! ```
106
107#![cfg_attr(feature = "docs", feature(doc_cfg))]
108#![warn(missing_docs)]
109#![warn(missing_debug_implementations, rust_2018_idioms)]
110#![doc(test(attr(deny(rust_2018_idioms, warnings))))]
111#![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
112
113pub mod cancelation_token;
114pub mod completion_token;
115
116#[cfg(test)]
117mod tests {
118
119 use std::sync::{Arc, Mutex};
120
121 use cooked_waker::{Wake, WakeRef, ViaRawPointer};
122
123 #[derive(Debug, Clone)]
124 pub struct TestWaker {
125 shared_state: Arc<Mutex<TestWakerState>>
126 }
127
128 #[derive(Debug, Clone)]
129 struct TestWakerState {
130 woke: bool
131 }
132
133 impl TestWaker {
134 pub fn new() -> TestWaker {
135 TestWaker {
136 shared_state: Arc::new(Mutex::new(TestWakerState {
137 woke: false
138 }))
139 }
140 }
141 }
142
143 impl WakeRef for TestWaker {
144 fn wake_by_ref(&self) {
145 let mut shared_state = self.shared_state.lock().unwrap();
146 shared_state.woke = true;
147 }
148 }
149
150 impl Wake for TestWaker {
151 fn wake(self) {
152 self.wake_by_ref();
153 }
154 }
155
156 impl ViaRawPointer for TestWaker {
157 type Target = ();
158
159 fn into_raw(self) -> *mut () {
160 let shared_state_ptr = Arc::into_raw(self.shared_state);
161 shared_state_ptr as *mut ()
162 }
163
164 unsafe fn from_raw(ptr: *mut ()) -> Self {
165 TestWaker {
166 shared_state: Arc::from_raw(ptr as *const std::sync::Mutex<TestWakerState>)
167 }
168 }
169 }
170}