1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//@ wasmtime-flags = '-Wcomponent-model-async'
include!(env!("BINDINGS"));
use crate::my::test::i::*;
use futures::task::noop_waker_ref;
use std::future::Future;
use std::task::Context;
use wit_bindgen::yield_async;
struct Component;
export!(Component);
impl Guest for Component {
async fn run() {
println!("test cancelling an import in progress");
{
let (tx, rx) = wit_future::new(|| unreachable!());
let mut import = Box::pin(pending_import(rx));
assert!(import
.as_mut()
.poll(&mut Context::from_waker(noop_waker_ref()))
.is_pending());
drop(import);
tx.write(()).await.unwrap_err();
}
println!("test cancelling an import before it starts");
{
let (tx, rx) = wit_future::new(|| unreachable!());
let import = Box::pin(pending_import(rx));
drop(import);
tx.write(()).await.unwrap_err();
}
println!("test cancelling an import in the started state");
{
let (tx1, rx1) = wit_future::new(|| unreachable!());
let (tx2, rx2) = wit_future::new(|| unreachable!());
// create a task in the "started" state, but don't complete it yet
let mut started_import = Box::pin(pending_import(rx1));
assert!(started_import
.as_mut()
.poll(&mut Context::from_waker(noop_waker_ref()))
.is_pending());
// request the other component sets its backpressure flag meaning we
// won't be able to create new tasks in the "started" state.
backpressure_set(true);
let mut starting_import = Box::pin(pending_import(rx2));
assert!(starting_import
.as_mut()
.poll(&mut Context::from_waker(noop_waker_ref()))
.is_pending());
// Now cancel the "starting" import. This should notably drop handles in
// arguments since they get re-acquired during cancellation
drop(starting_import);
// cancel our in-progress export
drop(started_import);
backpressure_set(false);
// both channels should be dropped
tx1.write(()).await.unwrap_err();
tx2.write(()).await.unwrap_err();
}
// Race an import's cancellation with a status code saying it's done.
println!("test cancellation with a status code saying it's done");
{
// Start a subtask and get it into the "started" state
let (tx, rx) = wit_future::new(|| unreachable!());
let mut import = Box::pin(pending_import(rx));
assert!(import
.as_mut()
.poll(&mut Context::from_waker(noop_waker_ref()))
.is_pending());
// Complete the subtask, but don't see the completion in Rust yet.
tx.write(()).await.unwrap();
// Let the subtask's completion notification make its way to our task
// here.
for _ in 0..5 {
yield_async().await;
}
// Now cancel the import, despite it actually being done. This should
// realize that the cancellation is racing completion.
drop(import);
}
// Race an import's cancellation with a pending status code indicating that
// it's transitioning from started => returned.
println!("race cancellation with pending status code");
{
// Start a subtask and get it into the "started" state
let (tx1, rx1) = wit_future::new(|| unreachable!());
let mut started_import = Box::pin(pending_import(rx1));
assert!(started_import
.as_mut()
.poll(&mut Context::from_waker(noop_waker_ref()))
.is_pending());
// force the next subtask to start out in the "starting" state, not the
// "started" state.
backpressure_set(true);
let (tx2, rx2) = wit_future::new(|| unreachable!());
let mut starting_import = Box::pin(pending_import(rx2));
assert!(starting_import
.as_mut()
.poll(&mut Context::from_waker(noop_waker_ref()))
.is_pending());
// Disable backpressure in the other component which will let the
// `starting_import`, previously in the "STARTING" state, get a queued up
// notification that it's entered the "STARTED" state.
backpressure_set(false);
for _ in 0..5 {
yield_async().await;
}
// Now cancel the `starting_import`. This should correctly pick up the
// STARTING => STARTED state transition and handle that correctly.
drop(starting_import);
// Our future to the import we cancelled shouldn't be able to complete
// its write.
tx2.write(()).await.unwrap_err();
// Complete the other import normally just to assert that it's not
// cancelled and able to proceed as usual.
tx1.write(()).await.unwrap();
started_import.await;
}
}
}