sync_tokens/
lib.rs

1// https://github.com/GWBasic/sync-tokens
2// (c) Andrew Rondeau
3// Apache 2.0 license
4// See https://github.com/GWBasic/sync-tokens/blob/main/LICENSE
5
6//! sync-tokens provides ways to coordinate with running tasks. It
7//! provides a way to cleanly cancel a running task, and a way for
8//! a running task to communicate when it's ready
9//! 
10//! ```toml
11//! # Example, use the version numbers you need
12//! sync-tokens = "0.1.0"
13//! async-std = { version = "1.7.0", features = ["attributes"] }
14//!```
15//! 
16//! # Examples
17//! 
18//! Accepts incoming sockets on a background task. Communicates when
19//! the listener is actively listening, and allows canceling the loop
20//! for incoming sockets
21//! 
22//! [See on github](https://github.com/GWBasic/sync-tokens-example)
23//! 
24//! ```no_run
25//! use std::io::{ Error, ErrorKind };
26//!
27//! use async_std::io::Result;
28//! use async_std::net::{IpAddr, Ipv4Addr, TcpListener, TcpStream, SocketAddr};
29//! use async_std::task;
30//! use async_std::task::JoinHandle;
31//! 
32//! use sync_tokens::cancelation_token::{ Cancelable, CancelationToken };
33//! use sync_tokens::completion_token::{ Completable, CompletionToken };
34//! 
35//! // Starts running a server on a background task
36//! pub fn run_server() -> (JoinHandle<Result<()>>, CompletionToken<Result<SocketAddr>>, CancelationToken) {
37//!     // This CompletionToken allows the caller to wait until the server is actually listening
38//!     // The caller gets completion_token, which it can await on
39//!     // completable is used to signal to completion_token
40//!     let (completion_token, completable) = CompletionToken::new();
41//! 
42//!     // This CancelationToken allows the caller to stop the server
43//!     // The caller gets cancelation_token
44//!     // cancelable is used to allow canceling a call to await
45//!     let (cancelation_token, cancelable) = CancelationToken::new();
46//! 
47//!     // The server is started on a background task, and the future returned
48//!     let server_future = task::spawn(run_server_int(completable, cancelable));
49//! 
50//!     (server_future, completion_token, cancelation_token)
51//! }
52//! 
53//! async fn run_server_int(completable: Completable<Result<SocketAddr>>, cancelable: Cancelable) -> Result<()> {
54//! 
55//!     let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
56//!     let listener = TcpListener::bind(socket_addr).await?;
57//! 
58//!     // Inform that the server is listening
59//!     let local_addr = listener.local_addr();
60//!     completable.complete(local_addr);
61//! 
62//!     // Create a future that waits for an incoming socket
63//!     let mut incoming_future = task::spawn(accept(listener));
64//!     
65//!     loop {
66//!         // Wait for either the incoming socket (via incoming_future) or for the CancelationToken
67//!         // to be canceled.
68//!         // When the CancelationToken is canceled, the error is returned
69//!         let (listener, _) = cancelable.allow_cancel(
70//!             incoming_future, 
71//!             Err(Error::new(ErrorKind::Interrupted, "Server terminated")))
72//!             .await?;
73//! 
74//!         incoming_future = task::spawn(accept(listener));
75//!     }
76//! }
77//! 
78//! async fn accept(listener: TcpListener) -> Result<(TcpListener, TcpStream)> {
79//!     let (stream, _) = listener.accept().await?;
80//!     Ok((listener, stream))
81//! }
82//! 
83//! #[async_std::main]
84//! async fn main() {
85//!     let (server_future, completion_token, cancelation_token) = run_server();
86//! 
87//!     println!("Server is starting");
88//! 
89//!     // Wait for the server to start
90//!     let local_addr = completion_token.await.unwrap();
91//! 
92//!     println!("Server is listening at {}", local_addr);
93//!     println!("Push Return to stop the server");
94//! 
95//!     let _ = std::io::stdin().read_line(&mut String::new()).unwrap();
96//! 
97//!     // Stop the server
98//!     cancelation_token.cancel();
99//! 
100//!     // Wait for the server to shut down
101//!     let err = server_future.await.unwrap_err();
102//! 
103//!     println!("Server ended: {}", err);
104//! }
105//! ```
106
107#![cfg_attr(feature = "docs", feature(doc_cfg))]
108#![warn(missing_docs)]
109#![warn(missing_debug_implementations, rust_2018_idioms)]
110#![doc(test(attr(deny(rust_2018_idioms, warnings))))]
111#![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
112
113pub mod cancelation_token;
114pub mod completion_token;
115
116#[cfg(test)]
117mod tests {
118
119    use std::sync::{Arc, Mutex};
120
121    use cooked_waker::{Wake, WakeRef, ViaRawPointer};
122
123	#[derive(Debug, Clone)]
124	pub struct TestWaker {
125		shared_state: Arc<Mutex<TestWakerState>>
126	}
127
128	#[derive(Debug, Clone)]
129	struct TestWakerState {
130		woke: bool
131	}
132
133	impl TestWaker {
134		pub fn new() -> TestWaker {
135			TestWaker {
136				shared_state: Arc::new(Mutex::new(TestWakerState {
137					woke: false
138				}))
139			}
140		}
141	}
142
143	impl WakeRef for TestWaker {
144		fn wake_by_ref(&self) {
145			let mut shared_state = self.shared_state.lock().unwrap();
146			shared_state.woke = true;
147		}
148	}
149
150	impl Wake for TestWaker {
151		fn wake(self) {
152			self.wake_by_ref();
153		}
154	}
155
156	impl ViaRawPointer for TestWaker {
157		type Target = ();
158	
159		fn into_raw(self) -> *mut () {
160			let shared_state_ptr = Arc::into_raw(self.shared_state);
161			shared_state_ptr as *mut ()
162		}
163	
164		unsafe fn from_raw(ptr: *mut ()) -> Self {
165			TestWaker {
166				shared_state: Arc::from_raw(ptr as *const std::sync::Mutex<TestWakerState>)
167			}
168		}
169	}
170}