muchin 0.1.0

Support for composing large, interacting, complicated state machines
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
use super::{
    action::EchoClientAction,
    state::{EchoClientState, EchoClientStatus},
};
use crate::automaton::{
    Dispatcher, ModelState, PureModel, RegisterModel, RunnerBuilder, State, Timeout, Uid,
};
use crate::{
    callback,
    models::pure::{
        net::{
            tcp::action::{TcpAction, TcpPollEvents},
            tcp_client::{action::TcpClientAction, state::TcpClientState},
        },
        prng::state::PRNGState,
        tests::echo_client::state::EchoClientConfig,
        time::model::update_time,
    },
};
use core::panic;
use log::{info, warn};
use rand::{Rng, RngCore};

// The `EchoClientState` acts as a simulated echo client, used for testing the
// functionality of the state-machine and its related models (`TcpClientState`,
// `TcpState`, `MioState`, `TimeState`). The echo client communicates with an
// echo server, which sends back any data it receives from the client.
//
// The `PureModel` implementation of the `EchoClientState` model processes
// `EchoClientAction::Tick` actions that are dispatched on each "tick" of the
// state-machine loop.
//
// During each "tick", the model performs two key tasks:
//
// 1. Updates the current time tracked by the state-machine.
//
// 2. Checks if the TCP client is ready. If it's not, the model initializes
//    the TCP client. If it is ready, a poll action is dispatched.
//
// The rest of the model's logic handles other action variants that:
//
// - Completes the initialization of the TCP client and connects it to the
//   echo server. If the connection request fails, the client makes up to
//   `max_connection_attempts` attempts to reconnect.
//   If this limit is exceeded, the client panics.
//
// - For each poll result the client sends random data to the echo server.
//   The size and content of this data are randomly generated using the
//   `PRNGState` model.
//
// - After sending data, the client dispatches a receive action to read the
//   server's response. A random timeout is generated using the `PRNGState`
//   model to simulate different network conditions.
//
// - When it receives data from the server, the client checks if the received
//   data matches the sent data. If not, the client panics.
//

// This model depends on `PRNGState` and `TcpClientState`.
impl RegisterModel for EchoClientState {
    fn register<Substate: ModelState>(builder: RunnerBuilder<Substate>) -> RunnerBuilder<Substate> {
        builder
            .register::<PRNGState>()
            .register::<TcpClientState>()
            .model_pure::<Self>()
    }
}

impl PureModel for EchoClientState {
    type Action = EchoClientAction;

    fn process_pure<Substate: ModelState>(
        state: &mut State<Substate>,
        action: Self::Action,
        dispatcher: &mut Dispatcher,
    ) {
        match action {
            EchoClientAction::Tick => {
                // Top-most model first task is to update the state-machine time.
                if update_time(state, dispatcher) {
                    // The next `EchoClientAction::Tick` will have the updated time.
                    return;
                }

                let EchoClientState {
                    status,
                    config: EchoClientConfig { poll_timeout, .. },
                    ..
                } = state.substate_mut();

                match status {
                    EchoClientStatus::Init => {
                        // Init TCP model
                        dispatcher.dispatch(TcpAction::Init {
                            instance: state.new_uid(),
                            on_success: callback!(|instance: Uid| EchoClientAction::InitSuccess { instance }),
                            on_error: callback!(|(instance: Uid, error: String)| EchoClientAction::InitError { instance, error }),
                        })
                    }
                    EchoClientStatus::Connecting
                    | EchoClientStatus::Connected { .. }
                    | EchoClientStatus::Sending { .. }
                    | EchoClientStatus::Receiving { .. } => {
                        let timeout = Timeout::Millis(*poll_timeout);
                        // If the client is already initialized then we poll on each "tick".
                        dispatcher.dispatch(TcpClientAction::Poll {
                            uid: state.new_uid(),
                            timeout,
                            on_success: callback!(|(uid: Uid, events: TcpPollEvents)| EchoClientAction::PollSuccess { uid, events }),
                            on_error: callback!(|(uid: Uid, error: String)| EchoClientAction::PollError { uid, error }),
                        })
                    }
                }
            }
            EchoClientAction::InitSuccess { .. } => {
                let connection = state.new_uid();
                let client_state: &mut EchoClientState = state.substate_mut();

                client_state.status = EchoClientStatus::Connecting;
                connect(client_state, connection, dispatcher);
            }
            EchoClientAction::InitError { error, .. } => {
                panic!("Client initialization failed: {}", error)
            }
            EchoClientAction::ConnectSuccess { connection } => {
                let EchoClientState {
                    status,
                    connection_attempt,
                    ..
                } = state.substate_mut();

                if let EchoClientStatus::Connecting = status {
                    *status = EchoClientStatus::Connected { connection };
                    *connection_attempt = 0;
                } else {
                    unreachable!()
                }
            }
            EchoClientAction::ConnectTimeout { connection } => {
                let new_connection_uid = state.new_uid();
                let EchoClientState {
                    status,
                    connection_attempt,
                    config:
                        EchoClientConfig {
                            max_connection_attempts,
                            ..
                        },
                    ..
                } = state.substate_mut();

                if let EchoClientStatus::Connecting = status {
                    *connection_attempt += 1;

                    warn!(
                        "|ECHO_CLIENT| connection {:?} timeout, reconnection attempt {}",
                        connection, connection_attempt
                    );

                    assert!(connection_attempt < max_connection_attempts);
                    connect(state.substate_mut(), new_connection_uid, dispatcher);
                } else {
                    unreachable!()
                }
            }
            EchoClientAction::ConnectError { connection, error } => {
                let new_connection_uid = state.new_uid();
                let EchoClientState {
                    status,
                    connection_attempt,
                    config:
                        EchoClientConfig {
                            max_connection_attempts,
                            ..
                        },
                    ..
                } = state.substate_mut();

                if let EchoClientStatus::Connecting = status {
                    *connection_attempt += 1;

                    warn!(
                        "|ECHO_CLIENT| connection {:?} error: {}, reconnection attempt {}",
                        connection, error, connection_attempt
                    );

                    assert!(connection_attempt < max_connection_attempts);
                    connect(state.substate_mut(), new_connection_uid, dispatcher);
                } else {
                    unreachable!()
                }
            }
            EchoClientAction::CloseEvent { connection } => {
                info!("|ECHO_CLIENT| connection {:?} closed", connection);

                let new_connection_uid = state.new_uid();
                let client_state: &mut EchoClientState = state.substate_mut();

                client_state.status = EchoClientStatus::Connecting;
                connect(client_state, new_connection_uid, dispatcher);
            }
            EchoClientAction::PollSuccess { .. } => {
                // Send random data on every poll if there are no pending send/recv requests.
                if let EchoClientState {
                    status: EchoClientStatus::Connected { connection },
                    config: EchoClientConfig { max_send_size, .. },
                    ..
                } = state.substate()
                {
                    let connection = *connection;
                    let max_send_size = *max_send_size;
                    let request = state.new_uid();
                    let prng: &mut PRNGState = state.substate_mut();
                    let random_size = prng.rng.random_range(1..max_send_size) as usize;
                    let mut data: Vec<u8> = vec![0; random_size];

                    prng.rng.fill_bytes(&mut data[..]);

                    state.substate_mut::<EchoClientState>().status = EchoClientStatus::Sending {
                        connection,
                        request,
                        data: data.clone(),
                    };

                    dispatcher.dispatch(TcpClientAction::Send {
                        uid: request,
                        connection,
                        data: data.into(),
                        timeout: Timeout::Millis(200),
                        on_success: callback!(|uid: Uid| EchoClientAction::SendSuccess { uid }),
                        on_timeout: callback!(|uid: Uid| EchoClientAction::SendTimeout { uid }),
                        on_error: callback!(|(uid: Uid, error: String)| EchoClientAction::SendError { uid, error })
                    });
                }
            }
            EchoClientAction::PollError { uid, error } => {
                panic!("Poll {:?} failed: {}", uid, error)
            }
            EchoClientAction::SendSuccess { uid } => {
                // Receive back what we sent
                if let EchoClientState {
                    status:
                        EchoClientStatus::Sending {
                            connection,
                            request,
                            data,
                        },
                    config:
                        EchoClientConfig {
                            min_rnd_timeout,
                            max_rnd_timeout,
                            ..
                        },
                    ..
                } = state.substate()
                {
                    assert_eq!(uid, *request);
                    let connection = *connection;
                    let sent_data = data.clone();
                    let count = data.len();

                    // We randomize client's recv timeout to force it fail sometimes
                    let timeout_range = *min_rnd_timeout..*max_rnd_timeout;
                    let prng: &mut PRNGState = state.substate_mut();
                    let timeout = Timeout::Millis(prng.rng.random_range(timeout_range));

                    let request = state.new_uid();

                    info!(
                        "|ECHO_CLIENT| dispatching recv request {:?} ({} bytes) from connection {:?} with timeout {:?}",
                        request, count, connection, timeout
                    );

                    state.substate_mut::<EchoClientState>().status = EchoClientStatus::Receiving {
                        connection,
                        request,
                        sent_data,
                    };

                    dispatcher.dispatch(TcpClientAction::Recv {
                        uid: request,
                        connection,
                        count,
                        timeout,
                        on_success: callback!(|(uid: Uid, data: Vec<u8>)| EchoClientAction::RecvSuccess { uid, data }),
                        on_timeout: callback!(|(uid: Uid, partial_data: Vec<u8>)| EchoClientAction::RecvTimeout { uid, partial_data }),
                        on_error: callback!(|(uid: Uid, error: String)| EchoClientAction::RecvError { uid, error }),
                    });
                } else {
                    unreachable!()
                }
            }
            EchoClientAction::SendTimeout { uid } => {
                if let EchoClientState {
                    status: EchoClientStatus::Sending { connection, .. },
                    ..
                } = state.substate()
                {
                    let connection = *connection;
                    warn!(
                        "|ECHO_CLIENT| send {:?} timeout to connection {:?}",
                        uid, connection
                    );
                    dispatcher.dispatch(TcpClientAction::Close { connection })
                } else {
                    unreachable!()
                }
            }
            EchoClientAction::SendError { uid, error } => {
                if let EchoClientState {
                    status: EchoClientStatus::Sending { connection, .. },
                    ..
                } = state.substate()
                {
                    warn!(
                        "|ECHO_CLIENT| send {:?} to connection {:?} error: {}",
                        uid, connection, error
                    );
                } else {
                    unreachable!()
                }
            }
            EchoClientAction::RecvSuccess { uid, data } => {
                if let EchoClientState {
                    status:
                        EchoClientStatus::Receiving {
                            connection,
                            request,
                            sent_data,
                        },
                    ..
                } = state.substate()
                {
                    assert_eq!(uid, *request);
                    let connection = *connection;

                    if *sent_data != data {
                        panic!("Data mismatch: {:?} != {:?}", sent_data, data)
                    }

                    state.substate_mut::<EchoClientState>().status =
                        EchoClientStatus::Connected { connection };

                    info!(
                        "|ECHO_CLIENT| recv {:?} from connection {:?}, data matches.",
                        uid, connection
                    );
                } else {
                    unreachable!()
                }
            }
            EchoClientAction::RecvTimeout { uid, .. } => {
                if let EchoClientState {
                    status:
                        EchoClientStatus::Receiving {
                            connection,
                            request,
                            ..
                        },
                    ..
                } = state.substate()
                {
                    assert_eq!(uid, *request);
                    let connection = *connection;

                    warn!(
                        "|ECHO_CLIENT| recv {:?} timeout from connection {:?}",
                        uid, connection
                    );
                    dispatcher.dispatch(TcpClientAction::Close { connection })
                } else {
                    unreachable!()
                }
            }
            EchoClientAction::RecvError { uid, error } => {
                if let EchoClientState {
                    status:
                        EchoClientStatus::Receiving {
                            connection,
                            request,
                            ..
                        },
                    ..
                } = state.substate()
                {
                    assert_eq!(uid, *request);
                    let connection = *connection;

                    warn!(
                        "|ECHO_CLIENT| recv {:?} from connection {:?} error: {}",
                        uid, connection, error
                    );
                } else {
                    unreachable!()
                }
            }
        }
    }
}

fn connect(client_state: &EchoClientState, connection: Uid, dispatcher: &mut Dispatcher) {
    let EchoClientState {
        config:
            EchoClientConfig {
                connect_to_address,
                connect_timeout,
                ..
            },
        ..
    } = client_state;

    dispatcher.dispatch(TcpClientAction::Connect {
        connection,
        address: connect_to_address.clone(),
        timeout: connect_timeout.clone(),
        on_success: callback!(|connection: Uid| EchoClientAction::ConnectSuccess { connection }),
        on_timeout: callback!(|connection: Uid| EchoClientAction::ConnectTimeout { connection }),
        on_error: callback!(|(connection: Uid, error: String)| EchoClientAction::ConnectError { connection, error }),
        on_close: callback!(|connection: Uid| EchoClientAction::CloseEvent { connection })
    });
}