proc_graph/
lib.rs

1//! A small library which converts a process graph into a set of communicating processes.
2//! ```
3//! use std::{thread, time::Duration};
4//!
5//! use proc_graph::Network;
6//!
7//!     env_logger::init();
8//!
9//!     let mut net = Network::new();
10//!
11//!     net.add_process("a", vec!["b", "c"], |senders, _| loop {
12//!         thread::sleep(Duration::from_secs(1));
13//!         for (adj, s) in senders.iter() {
14//!             println!("a is sending to {}", adj);
15//!             s.send(("a".to_string(), ()))
16//!                 .expect("shouldn't encounter a closed channel");
17//!         }
18//!     });
19//!
20//!     net.add_process("b", vec!["d"], |senders, receiver| loop {
21//!         thread::sleep(Duration::from_secs(1));
22//!         let (sender, _) = receiver
23//!             .recv()
24//!             .expect("shouldn't encounter a closed channel");
25//!         println!("b received from {}", sender);
26//!         for s in senders.values() {
27//!             s.send(("b".to_string(), ()))
28//!                 .expect("shouldn't encounter a closed channel");
29//!         }
30//!     });
31//!
32//!     net.add_process("c", vec!["d"], |senders, receiver| loop {
33//!         thread::sleep(Duration::from_secs(1));
34//!         let (sender, _) = receiver
35//!             .recv()
36//!             .expect("shouldn't encounter a closed channel");
37//!         println!("c received from {}", sender);
38//!         for s in senders.values() {
39//!             s.send(("c".to_string(), ()))
40//!                 .expect("shouldn't encounter a closed channel");
41//!         }
42//!     });
43//!
44//!     net.add_process("d", vec![], |_, receiver| loop {
45//!         thread::sleep(Duration::from_secs(1));
46//!         let (sender, _) = receiver
47//!             .recv()
48//!             .expect("shouldn't encounter a closed channel");
49//!         println!("d received from {}", sender);
50//!     });
51//!
52//!     net.start();
53//! ```
54use std::{
55    collections::HashMap,
56    sync::mpsc::{self, Receiver, Sender},
57    thread::{self, JoinHandle},
58};
59
60/// A network of processes. Each process has a mailbox and the set of
61/// addresses (senders) it needs to communicate with its neighbors.
62#[derive(Default)]
63pub struct Network<T> {
64    procs: HashMap<String, Process<T>>,
65}
66
67impl<T> Network<T>
68where
69    T: Default + Send + 'static,
70{
71    pub fn new() -> Self {
72        Network {
73            procs: HashMap::new(),
74        }
75    }
76
77    /// Adding a process to the network takes a name, its neighbors,
78    /// and a closure which acts as _the entire body_ of this
79    /// process. I.e., if you want the process to run forever, that
80    /// loop has to be _inside_ the given closure.
81    ///
82    /// The process's neighbor's addresses (the senders) and the
83    /// process's mailbox are passed into the closure. The only
84    /// abstraction provided here is converting the given
85    /// graph—described the process and its adjacencies—into mailboxes
86    /// and addresses.
87    pub fn add_process<'a, F>(&'a mut self, name: &'static str, adj: Vec<&'static str>, body: F)
88    where
89        F: Fn(HashMap<String, Sender<(String, T)>>, Receiver<(String, T)>) + Send + 'static,
90    {
91        let mut proc = {
92            if let Some(mut p) = self.procs.remove(name) {
93                p.adj = adj.into_iter().map(|a| a.to_string()).collect();
94                p.body = Some(Box::new(body));
95                p
96            } else {
97                let (s, r) = mpsc::channel();
98                Process {
99                    adj: adj.into_iter().map(|a| a.to_string()).collect(),
100                    senders: HashMap::new(),
101                    self_sender: s,
102                    receiver: r,
103                    body: Some(Box::new(body)),
104                }
105            }
106        };
107        for adj in proc.adj.iter() {
108            let recv_proc = {
109                if let Some(p) = self.procs.remove(adj) {
110                    p
111                } else {
112                    let (s, r) = mpsc::channel();
113                    Process {
114                        adj: Vec::new(),
115                        senders: HashMap::new(),
116                        self_sender: s,
117                        receiver: r,
118                        body: None,
119                    }
120                }
121            };
122            proc.senders
123                .insert(adj.to_string(), recv_proc.self_sender.clone());
124            self.procs.insert(adj.clone(), recv_proc);
125        }
126        self.procs.insert(name.to_string(), proc);
127    }
128
129    /// Start the processes in this network. Blocks until all
130    /// of the processes have finished.
131    pub fn start_and_wait(self) {
132        let mut handles = Vec::new();
133        for (_, proc) in self.procs.into_iter() {
134            handles.push(proc.run());
135        }
136        for handle in handles {
137            handle.join().expect("joining running threads");
138        }
139    }
140
141    /// Start the processes in this network.
142    pub fn start(self) {
143        for (_, proc) in self.procs.into_iter() {
144            proc.run();
145        }
146    }
147}
148
149type ProcessBody<T> = Option<
150    Box<dyn Fn(HashMap<String, Sender<(String, T)>>, Receiver<(String, T)>) + Send + 'static>,
151>;
152
153struct Process<T> {
154    adj: Vec<String>,
155    body: ProcessBody<T>,
156    senders: HashMap<String, Sender<(String, T)>>,
157    self_sender: Sender<(String, T)>,
158    receiver: Receiver<(String, T)>,
159}
160
161impl<T> Process<T>
162where
163    T: Default + Send + 'static,
164{
165    fn run(self) -> JoinHandle<()> {
166        thread::spawn(move || {
167            let body = self
168                .body
169                .expect("a graph should be complete before running its processes");
170            body(self.senders, self.receiver);
171        })
172    }
173}