rabble 0.4.1

A library for creating location transparent actor based systems
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# Introduction
Rabble is useful for building distributed, clustered applications where actors can run on different
[Nodes](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/src/node.rs) and
communicate over the network. This allows for easier implementation of distributed algorithms based
around asynchronous message passing between processes. Actors in rabble are primarily lightweight
[processes](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/src/process.rs) that
receive and send messages. Thread based
[services](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/src/service.rs) provide
a way to run computation heavy tasks, interact with the file system, or implement an API server
while retaining the capability to send to and receive messages from other processes and services.

This guide will show how to get started building a distributed system in rabble from the ground up.
The reader will learn how to create a node, join nodes together, spawn processes to act as peers
in a distributed system, and create an api service to allow interaction with that system.

# What are we building?
Our example should be complete enough to show off most features of rabble, while not shrouding the
basics with the complexity of the algorithm implementation. In light of this,
we will build a very simple and utterly fault intolerant replicated counter. The service will have 3 nodes,
with a replica on each node. The first node is the primary node, and has a TCP server that can take
requests to either increment the counter or get the current count. When an increment request is received it
will be sent to the primary replica on the same node which will then forward the request to the two
backup replicas and wait for the replies from both replicas. When the primary replica has received the replies it
will go ahead and send a message to the tcp server so it can respond to the client. Requests for the
current count are answered directly from the primary replica.

Note that this example is simplified in some major ways, and is an absolutely terrible way to build
a distributed counter. It assumes that:

  1. The network is reliable. Nodes will never become partitioned or lose connectivity.
  2. The network is not asynchronous, and messages are sent in bounded time. In the world of this
     example, any messages communication will occur without delay or timeout.
  3. Nodes will never crash. Replicas will always maintain the same position in the primary/backup
     relationship and will always have up to date data.

It probably assumes a bunch more
[fallacies](http://www.lasr.cs.ucla.edu/classes/188_winter15/readings/fallacies.pdf) than those, but
that's enough to show that you shouldn't build a production system in this manner, and that this is
only an example to explain how to use Rabble.

# Creating your nodes
Each node needs a unique
[NodeId](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/src/node_id.rs). A node
also needs a msg type for messages sent between actors. All actors can only send and receive a
single message type. You can read more about why
[here](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/doc/architecture.md#messages).
A node can then be started with a call to
[rabble::rouse](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/src/lib.rs#L80).

```Rust
use rabble::NodeId;

// The message shipped between actors in the system. It must implement these derived traits.
// Serialize and Deserialize provide serialization capability to arbitrary formats.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
enum CounterMsg {
    Increment,
    Ok, // Backup successfully received the Increment
    GetCount,
    Count(usize),
}

let node_ids = create_node_ids(3);

/// Each call to rabble::rouse spawns a few threads and returns their `JoinHandle`s along with the node.
/// The handles should be joined at some point later in the code. None as the second parameter to
/// rouse means just use the standard logger.
let (nodes, handles) = node_ids.cloned().into_iter().fold((Vec::new(), Vec::new()), |(mut nodes, mut handles), node_id| {
    let (node, handle_list) = rabble::rouse::<CounterMsg>(node_id, None);
    nodes.push(node);
    handles.extend(handle_list);
    (nodes, handles)
});

/// Create N node ids with names node1,node2,... and unique IP addresses. Don't create more than 9 :D
pub fn create_node_ids(n: usize) -> Vec<NodeId> {
    (1..n + 1).map(|n| {
        NodeId {
            name: format!("node{}", n),
            addr: format!("127.0.0.1:1100{}", n)
        }
    }).collect()
}
```

# Creating and starting 3 replicas

We now have 3 nodes up and running. We want to implement a replica process and then start one on
each node.

First let's create 3 Pids, one for each process, using the ``node_ids`` created previously. Note that
the `group` member of a pid can be used for a variety of reasons including multi-tenancy. For now,
let's just leave it blank.

```Rust
    let pids = ["replica1", "replica2", "replica3"].iter().zip(node_ids).map(|(name, node_id)| {
        Pid {
            name: name.to_string(),
            group: None,
            node: node_id.clone()
        }
    }).collect()
```

Now we need to define our replica type and implement the counter process. Note that the messages
received by a process are of type
[Msg](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/src/msg.rs) which is
paramterized by the `CounterMsg`. This allows receipt of system data as well as user defined types.
For now though, we will just concern ourself with the `User(T)` variant of the `Msg` enum.
Additionally, each message has a corresponding
[CorrelationId](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/src/correlation_id.rs)
used to match requests with responses. Any received messages should put the correlation id in the
outgoing envelopes.

```Rust
pub struct Counter {
    pid: Pid,
    primary: bool,
    backups: Vec<Pid>,
    count: usize,
    output: Vec<Envelope<CounterMsg>>,

    // We have to wait for both backup replies before responding to the client
    backup_replies: HashMap<CorrelationId, usize>
}

impl Counter {
  pub fn new(pid: Pid, primary: Pid, backups: Vec<Pid>) -> Counter {
      // Size the output vector for the expected number of outgoing messages
      let size = if pid == primary {
          2
      } else {
          1
      };

      Counter {
        pid: pid,
        primary: primary == pid,
        backups: backups,
        count: 0,
        output: Vec::with_capacity(size),
        backup_replies: HashMap::new()
      }
  }
}

impl Process for Counter {
    // Each process needs a type. We defined it above. It's the one we used to paramaterize the call
    // to rabble::rouse()
    type Msg = CounterMsg;

    // Each process must implement a single method, `handle`.
    fn handle(&mut self, msg: Msg<CounterMsg>,
              from: Pid,
              correlation_id: Option<CorrelationId>,
              output: &mut Vec<Envelope<CounterMsg>>)
    {
        match msg {
          Msg::User(CounterMsg::Inc) => {
              self.count += 1;
              if self.primary {
                  // Send the increment to the two backups
                  // For now assume correlation_id is a `Some`
                  self.backup_replies.insert(correlation_id.as_ref().unwrap().clone(), 0);
                  for &b in self.backups {
                      let msg = Msg::User(CounterMsg::Inc);
                      let envelope = Envelope::new(b.clone(), self.pid.clone(), msg, correlation_id);
                      output.push(envelope);
                  }
              } else {
                  // Respond to the primary
                  let reply = Msg::User(CounterMsg::Ok);
                  let envelope = Envelope::new(from, self.pid.clone(), reply, correlation_id);
                  output.push(envelope);
              }
          },
          Msg::User(CounterMsg::GetCount) => {
              // Only the primary gets this message
              let reply = Msg::User(CounterMsg::Count(self.count));
              let envelope = Envelope::new(from, self.pid.clone(), reply, correlation_id);
              output.push(envelope);
          },
          Msg::User(CounterMsg::Ok) => {
              // Increment the backup_replies. Once we have received both, reply to the client
              // Do this in a block to limit the borrow scope
              let count = {
                  let count = self.backup_replies.get_mut(correlation_id.as_ref().unwrap()).unwrap();
                  *count += 1;
                  *count
              };

              if count == 2 {
                  self.backup_replies.remove(correlation_id.as_ref().unwrap());
                  // Send to the original requester, not the sender. For now assume the correlation_id
                  // is a Some(id). It has to be for any chained req/response to work properly.
                  let to = correlation_id.as_ref().unwrap().pid.clone();
                  let reply = CounterMsg::Ok;
                  let envelope = Envelope::new(to, self.pid.clone(), reply, correlation_id);
                  output.push(envelope);
              }
          },
          _ => unreachable!()
        }
    }
}
```

Now let's start the replicas so that they can receive and send messages.

```Rust
let primary = pids[0].clone();
let backups = vec![pids[1].clone, pids[2].clone()];
for pid in pids {
    // Processes can be any type that implements Process, so create a trait object with Box::new()
    let replica = Box::new(Counter::new(pids[i].clone(), primary.clone(), backups.clone()));
    // Start the replica on the correct node
    nodes[i].spawn(&pids[i], replica).unwrap();
}
```

# Join the nodes
We need to join the nodes together into a cluster. Note that this is an operation that should most
likely be exposed to the end user via an Admin server. For now though, we are just going to use the
Rabble [Node
API](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/src/node.rs#L52-L65)
to do the join.

In order to know when the nodes have been joined, we need to have some way of checking the cluster
state and getting responses back to our requests. Normally this would be done in an admin service,
but for now we can just register a channel for our test and poll on it.

```Rust
nodes[0].join(&nodes[1].id).unwrap();
nodes[0].join(&nodes[2].id).unwrap();

// Create a Pid for our "test service". This is used to register a channel so that we can receive
// responses to requests.
let test_pid = Pid {
    name: "test-runner".to_string(),
    group: None,
    node: node_ids[0].clone()
};

// We create an amy channel so that we can pretend this test is a service.
// We register the sender and our pid with node1 so that we can check the responses to admin calls
// like node.cluster_status().
let mut poller = Poller::new().unwrap();
let (test_tx, test_rx) = poller.get_registrar().channel().unwrap();
nodes[0].register_service(&test_pid, &test_tx).unwrap();

let start = SteadyTime::now();
loop {
      // Create a CorrelationId so that the responses to our requests get sent back on the right channel
      let correlation_id = CorrelationId::pid(test_pid.clone());

      // Send a ClusterStatus request to the cluster server on node1.
      nodes[0].cluster_status(correlation_id).unwrap();

      // Poll on the test channel for a response. We should only get a ClusterStatus response
      let _ = poller.wait(5000).unwrap();
      let envelope = test_rx.try_recv().unwrap();

      // Match on the msg and see if both backups are currently connected to node1
      if let Msg::ClusterStatus(ClusterStatus{connected, ..}) = envelope.msg {
        if connected.len() == 2 {
            println!("{:#?}", connected);
            println!("Cluster connected in {} ms", (SteadyTime::now() - start).num_milliseconds());
            break;
        }
      }
}
```

# Creating an API Service
Now we have 3 nodes up, with a counter process on each one. We hacked our way through the cluster
setup, but now we want to learn how to build a service so that we can present both admin and API
servers to network clients. Since we've already joined the nodes, we'll focus on building an API
server here. All services must implement the [ServiceHandler
trait](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/src/service_handler.rs).

Our API service will use 4 byte framed MsgPack encoded messages over TCP and will use the
already built
[TcpServerHandler](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/src/tcp_server_handler.rs).
This service isolates connections from each other and routes messages to the correct connection.
Connection handlers themselves are user specified and can be customized for the specific
application. Therefore instead of writing a service handler directly we will instead need to
implement a
[ConnectionHandler](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/src/connection_handler.rs).

Each connection handler has 2 message types that must be defined. One is for the actors in the
system, which is the `CounterMsg` we've been using in the rest of the example. The other is the
message sent between the client and the API server. In almost every case these messages will differ,
but for our purposes they can be the same message.

There are 3 callback functions to implement for a ConnectionHandler. `new()` is called with
the pid of the service running the service handler (which calls the connection handler), and the
unique id of the connection for use in correlation ids. ``handle_envelope()``is called when an actor msg
message is sent to the connection handler. In general this occurs when a reply to a client request
comes back to the handler. This reply is then bundled into the `ConnectionMsg::Client` variant and
returned so it can be sent back on the client connection. ``handle_network_msg()`` gets called when a
new message is received from the client. These requests are packed into Envelopes and returned as
`ConnectionMsg::Envelope` variants so they can be routed to actors.

```Rust
pub struct ApiServerConnectionHandler {
    pid: Pid,
    counter_pid: Pid,
    id: usize,
    total_requests: usize
}

impl ConnectionHandler for ApiServerConnectionHandler {
    type Msg = CounterMsg;
    type ClientMsg = CounterMsg;

    fn new(pid: Pid, id: usize) -> ApiServerConnectionHandler {
        let counter_pid = Pid {
            name: "replica1".to_string(),
            group: None,
            node: pid.node_id.clone()
        };

        ApiServerConnectionHandler {
            pid: pid,
            counter_pid: counter_pid,
            id: id,
            total_requests: 0
        }
    }

    fn handle_envelope(&mut self,
                       envelope: Envelope<CounterMsg>
                       output: &mut Vec<ConnectionMsg<ApiServerConnectionHandler>>)
    {
        let Envelope {msg, correlation_id, ..} = envelope;
        // Envelopes destined for a connection handler must have a correlation id
        let correlation_id = correlation_id.unwrap();

        match msg {
            Msg::User(counter_msg) =>
              output.push(ConnectionMsg::ClientMsg(counter_msg, correlation_id));

            // Requests can timeout as well. Our client message should contain a Timeout variant.
            Msg::Timeout => ...,

            _ => ... /// ignore other messaages for now
        }
    }

    fn handle_network_msg(&mut self,
                          msg: CounterMsg,
                          output: &mut Vec<ConnectionMsg<ApiServerConnectionHandler>>)
    {
        // Our client and actor messages are the same, so just forward to the counter process.
        // Note that in a real system, either the counter Pid would be passed in from the client, known
        // a-priori, or learned via an envelope in `handle_envelope`. For now we just know it
        // a-priori.
        let msg = Msg::User(msg);
        let correlation_id = CorrelationId::request(self.pid.clone(), self.id, self.total_requests);
        self.total_requests += 1;
        let envelope = Envelope::new(self.counter_pid.clone(), self.pid.clone(), msg, Some(correlation_id));
        output.push(ConnectionMsg::Envelope(envelope));
    }
```

Now that we've created the connection handler for our API server, we need to give the service a Pid and start the server.

```Rust
    let server_pid = Pid {
        name: "api-server".to_string(),
        group: None,
        node: nodes[0].id.clone()
    };

    /// Create a TcpServerHandler that listens on "127.0.0.1:11001", has a 5 second request timeout
    /// and no connection timeout.
    let handler: TcpServerHandler<ApiServerConnectionHandler, MsgpackSerializer<CounterMsg>> =
        TcpServerHandler::new(server_pid.clone(), "127.0.0.1:11001", 5000, None);
    let mut service = Service::new(server_pid, nodes[0].clone(), handler).unwrap();

    // Services need to run in their own thread
    let h = thread::spawn(move || {
        service.wait();
    });
```

# Timers

The guide so far has explained how to implement a system using rabble. It hit all of the major
points. However, in assuming a bounded, reliable network, the example ignored worrying about lost or
delayed messages. In reality, distributed systems must take account of this by setting a timer for
each request. If the timer expires, then the user is alerted of the timeout. Whether the request
succeeded or failed is indeterminate. This is an unfortunate fact of nature. Rabble allow users to
add timers for all requests from within a process or service. (Note that the TcpServerHandler
automatically manages request timeouts, so it is unneccessary to use this facility for that
purpose.)

Timers are tied to a given process and correlation id, and are declared in milliseconds.
Currently the maximum timer length is 59 minutes, and the minimum timer resolution is 10ms. Timers
under one second are rounded to the higher 10ms, timers of 1 second to 59 seconds are rounded to
the higher second, and timers of 1 minute or more are rounded to the higher minute. This behavior
is based on the hierarchical timer wheel implementation in
[ferris](https://github.com/andrewjstone/ferris).

Additionally, processes may want to return messages or set timers on startup. For this reason, there
is an optional
[init()](https://github.com/andrewjstone/rabble/blob/e1474eda584f3c278322ce21d33d56e6e30f639f/src/process.rs#L12-L14)
callback that can be implemented for processes. The example below will show the impelmentation of a
simple test process that starts a 100ms timer in `init()` by responding with a message destined for
the executor, and then gets a callback `Msg::Timeout` in `handle`.

```Rust
struct TestProcess {
    pid: Pid,
    executor_pid: Option<Pid>,
    output: Vec<Envelope<()>>
}

impl Process for TestProcess {
    type Msg = ();

    fn init(&mut self, executor_pid: Pid) -> Vec<Envelope<()>> {
        self.executor_pid = Some(executor_pid);

        // Start a timer with a 100ms timeout and no correlation id. We don't need one since there is
        // only one timer in this example. In practice timers should almost always have CorrelationIds.
        vec![Envelope::new(self.executor_pid.as_ref().unwrap().clone(),
                           self.pid.clone(),
                           Msg::StartTimer(100),
                           None)]
    }

    fn handle(&mut self,
              msg: Msg<()>,
              from: Pid,
              correlation_id: Option<CorrelationId>,
              output: &mut Vec<Envelope<()>>)
    {
      assert_eq!(from, *self.executor_pid.as_ref().unwrap());
      assert_eq!(msg, Msg::Timeout);
      assert_eq!(correlation_id, None);
    }
}
```