aselect 0.4.0

Opinionated replacement for tokio::select!, avoiding certain pitfalls.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
# Async application main loops

## Background

This document takes a look at an async rust application and illustrates a few pitfalls. In part 2,
we take a look at how these challenges can be overcome using the "aselect" crate.

## Scenario

Let's say you're writing a server for an embedded system. The system is controlled through a custom
protocol built on top of TCP. The hardware has a temperature sensor and a heater. A client should be able
to read the temperature and control the heater.

Let's start by writing a simple server. We'll assume there's only a single client at a time, for this example.

### A starting point

An initial minimal main loop might look something like this:

```rust
use tokio::net::TcpStream;
use tokio::io::AsyncReadExt;
use std::io::Result;

async fn set_heater_power(power: u8) { /* implementation */ }

async fn run_server(stream: &mut TcpStream) -> Result<()> {
    let (mut reader, writer) = stream.split();
    
    loop {

        let cmd: u8 = reader.read_u8().await?;
        match cmd {
            1 => {
                let power: u8 = reader.read_u8().await?;
                set_heater_power(power).await;
            }
            _ => {
                // Unknown command: Do error handling
            }
        }
    }
}
```

The above example works reliably. Clients write a 1-byte command to adjust power, followed by a 1-byte parameter with 
the desired power. While `set_heater_power` is executing, no further commands are processed. However,
this is acceptable for now.

### Adding a query feature

Now, let's add a way to query the current temperature:

```rust

async fn set_heater_power(power: u8) {}
async fn measure_temperature() -> u8 {42}

use tokio::net::TcpStream;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use std::io::Result;

async fn run_server(stream: &mut TcpStream) -> Result<()> {
    let (mut reader, mut writer) = stream.split();
    
    loop {

        let cmd: u8 = reader.read_u8().await?;
        match cmd {
            1 => {
                let power: u8 = reader.read_u8().await?;
                set_heater_power(power).await;
            }
            2 => { // Query temperature
                let temperature = measure_temperature().await;
                writer.write_u8(2).await?;
                writer.write_u8(temperature).await?;
            }
            _ => {
                // Unknown command: Do error handling
            }
        }
    }
}
```

This adds a second type of command, encoded as a byte with value '2'. Upon receiving such a command, the server
writes a magic byte (2) back, followed by the temperature encoded in a byte. Let's not worry about units or 
data types for physical quantities for this example.

### Adding an alarm feature

Now, let's add an alarm feature. Whenever the temperature exceeds 100, the client should be notified immediately,
without having to perform a request. To achieve this, we need a primitive that allows us to monitor two different
futures for completion: The temperature (from the hardware itself), and incoming requests (from the client).

Tokio provides such a primitive, `tokio::select`:

Our program might now looks something like this:

```rust

async fn set_heater_power(power: u8) {}
async fn measure_temperature() -> u8 {42}
/// Waits for the temperature to change, then returns the new value
async fn wait_temperature_alarm() -> u8 {42}

use tokio::net::TcpStream;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use std::io::Result;
use tokio::select;

async fn run_server(stream: &mut TcpStream) -> Result<()> {
    let (mut reader, mut writer) = stream.split();
    
    loop {
        select!{
            cmd = reader.read_u8() => {
                match cmd? {
                    1 => {
                        let power: u8 = reader.read_u8().await?;
                        set_heater_power(power).await;
                    }
                    2 => { // Query temperature
                        let temperature = measure_temperature().await;
                        writer.write_u8(2).await?;
                        writer.write_u8(temperature).await?;
                    }
                    _ => {
                        // Unknown command: Do error handling
                    }
                }                 
            },
            new_temperature = wait_temperature_alarm() => {
                writer.write_u8(2).await?;
                writer.write_u8(new_temperature).await?;
            }
        }
    }
}
```

#### A potential bug

The above program is likely to work well in practice, but it potentially has a subtle bug: If `wait_temperature_alarm`
completes frequently, it may end up saturating the TcpStream send buffer, effectively blocking on `writer.write_u8`.
If the client has somehow managed to fill up its send-queue too, the system will deadlock. This may be quite unlikely
to happen for this simple example, but as a system grows more complex, this type of issue will be more likely.

A similar potential misfeature is that while `set_heater_power` is executing, the alarm feature is not active.

Both these limitations may be perfectly fine, depending on circumstances such as buffer sizes and client behavior. 

### Refactoring

Now, let's leave these concerns for a while and consider cleaning up the program a little. Mixing protocol parsing
and logic like this can make the program harder to reason about. So let's abstract the protocol parsing into
separate functions:


```rust

async fn set_heater_power(power: u8) {}
async fn measure_temperature() -> u8 {42}
/// Waits for the temperature to change, then returns the new value
async fn wait_temperature_alarm() -> u8 {42}

use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp;
use std::io::Result;
use tokio::select;

enum Command {
    SetPower(u8),
    QueryTemperature,
}
enum Response {
    Temperature(u8)
}

async fn read_command(reader: &mut tcp::ReadHalf<'_>) -> std::io::Result<Command> {
    Ok(match reader.read_u8().await? {
        1 => {
            Command::SetPower(reader.read_u8().await?)
        }
        2 => { // Query temperature
            Command::QueryTemperature
        }
        _ => panic!("unexpected command")        
    })
}
async fn write_response(writer: &mut tcp::WriteHalf<'_>, response: Response) -> std::io::Result<()> {
    match response {
        Response::Temperature(temperature) => {
            writer.write_u8(2).await?;
            writer.write_u8(temperature).await?;
        }
    }
    Ok(())
}


async fn run_server(stream: &mut TcpStream) -> Result<()> {
    let (mut reader, mut writer) = stream.split();
    
    loop {
        select!{
            cmd = read_command(&mut reader) => {
                match cmd? {
                    Command::SetPower(power) => {
                        set_heater_power(power).await;
                    }
                    Command::QueryTemperature => { // Query temperature
                        let temperature = measure_temperature().await;
                        write_response(&mut writer, Response::Temperature(temperature)).await?;
                    }
                }                 
            },
            new_temperature = wait_temperature_alarm() => {
                write_response(&mut writer, Response::Temperature(new_temperature)).await?;
            }
        }
    }
}
```

Nice! Protocol parsing is no longer intertwined with program logic.

However, the program now contains a pretty severe bug. If the temperature changes while a SetPower command is being
read, the future returned by `read_command` will be canceled. But it may already have consumed the first byte of
the 2-byte on-wire packet. In the next iteration of the loop, the 'power' parameter will now be interpreted
as a new command.

The cause of this framing error is that `read_command` is not "cancel safe". For an excellent
article on cancel safety, see: <https://rfd.shared.oxide.computer/rfd/400>.

### Canceling temperature reading 

Let's continue this slightly contrived journey, and look at the method `wait_temperature_change`.
Imagine that its innards perform hardware operations like this:

```rust
use tokio::time::Duration;
use tokio::time::sleep;
fn enable_measuring_current() {}
fn sample_ad_converter() -> u8 {42}
async fn wait_temperature_change() -> u8 {
    loop {
        enable_measuring_current();
        sleep(Duration::from_millis(1)).await;
        let temperature = sample_ad_converter();
        if temperature > 100 {
            return temperature;
        }
        sleep(Duration::from_millis(1000)).await;
    }
}
```

Imagine this is measuring some very sensitive chemical process, or whatever, sending current through at PT100 
resistive temperature probe. A precise current is transmitted, and the voltage drop across the actual temperature sensing 
element is measured. However, we only want to enable the current while actually measuring, because the current will 
make the probe generate heat, affecting the precision of the measurement.

Now, we notice that it's not great if the above method is canceled. It could result in the current being enabled
while `write_response` is running. If the network is slow, or the client fails to read, or similar, this could
leave the current enabled for an unbounded amount of time. Depending on the chemical process measured, this could 
conceivably be harmless or catastrophic (I'm not a chemist - maybe you can already tell).

To avoid the possibility of this unwanted cancellation, let's modify the main program like this:


```rust

async fn set_heater_power(power: u8) {}
async fn measure_temperature() -> u8 {42}
/// Waits for the temperature to change, then returns the new value
async fn wait_temperature_alarm() -> u8 {42}

use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp;
use std::io::Result;
use tokio::select;
use std::pin::pin;

enum Command {
    SetPower(u8),
    QueryTemperature,
}
enum Response {
    Temperature(u8)
}

async fn read_command(reader: &mut tcp::ReadHalf<'_>) -> std::io::Result<Command> {
    todo!() // omitted for brevity
}
async fn write_response(writer: &mut tcp::WriteHalf<'_>, response: Response) -> std::io::Result<()> {
    todo!() // omitted for brevity
}


async fn run_server(stream: &mut TcpStream) -> Result<()> {
    let (mut reader, mut writer) = stream.split();
    
    let mut alarm = pin!(wait_temperature_alarm());
    
    loop {
        select!{
            cmd = read_command(&mut reader) => {
                match cmd? {
                    Command::SetPower(power) => {
                        set_heater_power(power).await;
                    }
                    Command::QueryTemperature => { // Query temperature
                        let temperature = measure_temperature().await;
                        write_response(&mut writer, Response::Temperature(temperature)).await?;
                    }
                }                 
            },
            new_temperature = &mut alarm => {
                write_response(&mut writer, Response::Temperature(new_temperature)).await?;
                alarm.set(wait_temperature_alarm());
            }
        }
    }
}
```

The `alarm` future is now never canceled. Instead, the future is polled to completion. However, the program, as written,
is still not great. Even though the `wait_temperature_alarm` future isn't ever canceled, it is still not scheduled
while `write_response` executes.

### Async code and resource ownership

Now, let's imagine that our hypothetical machine has other features,
and some of these features interfere with the precise temperature measurement. For this reason, we modify
`wait_temperature_change` to acquire a lock while performing measurements:

```rust
use tokio::time::{Duration, sleep};

fn enable_measuring_current() {}
fn sample_ad_converter() -> u8 {42}
static PRECISE_MEASUREMENT_MUTEX: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(()); 

async fn wait_temperature_change() -> u8 {
    loop {
        enable_measuring_current();
        {
            let _guard = PRECISE_MEASUREMENT_MUTEX.lock().await;
            sleep(Duration::from_millis(1)).await;
            let temperature = sample_ad_converter();
            if temperature > 100 {
                return temperature;
            }
        }
        sleep(Duration::from_millis(1000)).await;
    }
}
```

Imagine that the `measure_temperature` method also acquires `PRECISE_MEASUREMENT_MUTEX`. This will now 
potentially deadlock the system. Futures that exist but are not being actively polled are hard to reason 
about. In many other languages, the `wait_temperature_change` method above would be safe against deadlock
(unless `sample_ad_converter` also grabs the lock, but if it's a method of some hardware abstraction layer,
it could be quite easily determined not to).

To be clear, the deadlock can happen because if the future created by `wait_temperature_change` stop being
polled, it may have executed `PRECISE_MEASUREMENT_MUTEX.lock().await`, but not
completed `sleep(Duration::from_millis(1)).await;`. Nominally, the sleep returns within 1 ms, plus minus
some jitter. But if the future isn't polled, even the sleep will never complete.

# What we've learned

This post has illustrated three related, but different problems:

1. The dangers of async cancellation (framing error when `read_command` is canceled)
2. The danger of not processing input (deadlock when client + server both write without reading)
3. The danger of non-polled futures owning resources (PRECISE_MEASUREMENT_MUTEX deadlock)

# How should this be solved?

The example presented in this document may be slightly contrived. But every issue illustrated
can happen in real rust code, and worse, requires non-trivial non-local analysis to avoid. The `read_command`
shown isn't buggy in itself, it's just not cancellation safe. Determining if a piece of async code may
be canceled is not possible with only local information. 

The first and third points above can be viewed as being caused direction by a failure to poll futures
to completion. 

I'd like to propose the following rule: Futures should always be polled continuously, to completion.

There's an interesting parallel here to aborting threads. The programming community has long since
come to the conclusion that the ability to abort threads "from the outside" causes more harm than benefit.

Rust does not support terminating threads from another thread. Neither does python.
For C#, the ability has been deprecated for a long time, see: 
<https://learn.microsoft.com/en-us/dotnet/core/compatibility/core-libraries/5.0/thread-abort-obsolete>
Java does not support it: <https://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html>

# In the real world

The example presented in this text is simplified. However, the problems illustrated can happen in more
realistic code bases too. Whenever a single task is expected to react to multiple different stimuli
and the code is composed of async methods calling other async methods, these issues can arise.

# Other viewpoints

## Cancellation isn't that bad
It could be argued that cancellation isn't to be avoided. The programmer just has to ensure that
methods are cancel safe. However, this is often quite difficult in practice. The developer has to
consider the effect of stopping execution at every `.await` point in a cancel safe method.

## Not polling futures isn't that bad
It could be argued that it's okay to have futures that are not being polled. However, this brings
a similar amount of cognitive overhead. It means that even sleeps and timeouts cannot be taken for granted.
The programmer has to consider every `.await` point to last an unbounded amount of time.

It can be argued that this is the case even if futures are constantly polled. After all, there are no
hard performance guarantees in most rust environments. However, without keeping track of which futures are polled,
it can be hard to reason about if a particular program will complete or not. This is especially true if futures
not being polled hold locks. But the same goes for other types of synchronization primitives. For example, a
future blocking on an mpsc channel send may cause starvation in other parts of the system, and will never
complete if the future isn't being polled.
 


# Using the aselect library

See [aselect](EXAMPLE.md) for an implementation of the above example code using aselect.