proc_graph/lib.rs
1//! A small library which converts a process graph into a set of communicating processes.
2//! ```
3//! use std::{thread, time::Duration};
4//!
5//! use proc_graph::Network;
6//!
7//! env_logger::init();
8//!
9//! let mut net = Network::new();
10//!
11//! net.add_process("a", vec!["b", "c"], |senders, _| loop {
12//! thread::sleep(Duration::from_secs(1));
13//! for (adj, s) in senders.iter() {
14//! println!("a is sending to {}", adj);
15//! s.send(("a".to_string(), ()))
16//! .expect("shouldn't encounter a closed channel");
17//! }
18//! });
19//!
20//! net.add_process("b", vec!["d"], |senders, receiver| loop {
21//! thread::sleep(Duration::from_secs(1));
22//! let (sender, _) = receiver
23//! .recv()
24//! .expect("shouldn't encounter a closed channel");
25//! println!("b received from {}", sender);
26//! for s in senders.values() {
27//! s.send(("b".to_string(), ()))
28//! .expect("shouldn't encounter a closed channel");
29//! }
30//! });
31//!
32//! net.add_process("c", vec!["d"], |senders, receiver| loop {
33//! thread::sleep(Duration::from_secs(1));
34//! let (sender, _) = receiver
35//! .recv()
36//! .expect("shouldn't encounter a closed channel");
37//! println!("c received from {}", sender);
38//! for s in senders.values() {
39//! s.send(("c".to_string(), ()))
40//! .expect("shouldn't encounter a closed channel");
41//! }
42//! });
43//!
44//! net.add_process("d", vec![], |_, receiver| loop {
45//! thread::sleep(Duration::from_secs(1));
46//! let (sender, _) = receiver
47//! .recv()
48//! .expect("shouldn't encounter a closed channel");
49//! println!("d received from {}", sender);
50//! });
51//!
52//! net.start();
53//! ```
54use std::{
55 collections::HashMap,
56 sync::mpsc::{self, Receiver, Sender},
57 thread::{self, JoinHandle},
58};
59
60/// A network of processes. Each process has a mailbox and the set of
61/// addresses (senders) it needs to communicate with its neighbors.
62#[derive(Default)]
63pub struct Network<T> {
64 procs: HashMap<String, Process<T>>,
65}
66
67impl<T> Network<T>
68where
69 T: Default + Send + 'static,
70{
71 pub fn new() -> Self {
72 Network {
73 procs: HashMap::new(),
74 }
75 }
76
77 /// Adding a process to the network takes a name, its neighbors,
78 /// and a closure which acts as _the entire body_ of this
79 /// process. I.e., if you want the process to run forever, that
80 /// loop has to be _inside_ the given closure.
81 ///
82 /// The process's neighbor's addresses (the senders) and the
83 /// process's mailbox are passed into the closure. The only
84 /// abstraction provided here is converting the given
85 /// graph—described the process and its adjacencies—into mailboxes
86 /// and addresses.
87 pub fn add_process<'a, F>(&'a mut self, name: &'static str, adj: Vec<&'static str>, body: F)
88 where
89 F: Fn(HashMap<String, Sender<(String, T)>>, Receiver<(String, T)>) + Send + 'static,
90 {
91 let mut proc = {
92 if let Some(mut p) = self.procs.remove(name) {
93 p.adj = adj.into_iter().map(|a| a.to_string()).collect();
94 p.body = Some(Box::new(body));
95 p
96 } else {
97 let (s, r) = mpsc::channel();
98 Process {
99 adj: adj.into_iter().map(|a| a.to_string()).collect(),
100 senders: HashMap::new(),
101 self_sender: s,
102 receiver: r,
103 body: Some(Box::new(body)),
104 }
105 }
106 };
107 for adj in proc.adj.iter() {
108 let recv_proc = {
109 if let Some(p) = self.procs.remove(adj) {
110 p
111 } else {
112 let (s, r) = mpsc::channel();
113 Process {
114 adj: Vec::new(),
115 senders: HashMap::new(),
116 self_sender: s,
117 receiver: r,
118 body: None,
119 }
120 }
121 };
122 proc.senders
123 .insert(adj.to_string(), recv_proc.self_sender.clone());
124 self.procs.insert(adj.clone(), recv_proc);
125 }
126 self.procs.insert(name.to_string(), proc);
127 }
128
129 /// Start the processes in this network. Blocks until all
130 /// of the processes have finished.
131 pub fn start_and_wait(self) {
132 let mut handles = Vec::new();
133 for (_, proc) in self.procs.into_iter() {
134 handles.push(proc.run());
135 }
136 for handle in handles {
137 handle.join().expect("joining running threads");
138 }
139 }
140
141 /// Start the processes in this network.
142 pub fn start(self) {
143 for (_, proc) in self.procs.into_iter() {
144 proc.run();
145 }
146 }
147}
148
149type ProcessBody<T> = Option<
150 Box<dyn Fn(HashMap<String, Sender<(String, T)>>, Receiver<(String, T)>) + Send + 'static>,
151>;
152
153struct Process<T> {
154 adj: Vec<String>,
155 body: ProcessBody<T>,
156 senders: HashMap<String, Sender<(String, T)>>,
157 self_sender: Sender<(String, T)>,
158 receiver: Receiver<(String, T)>,
159}
160
161impl<T> Process<T>
162where
163 T: Default + Send + 'static,
164{
165 fn run(self) -> JoinHandle<()> {
166 thread::spawn(move || {
167 let body = self
168 .body
169 .expect("a graph should be complete before running its processes");
170 body(self.senders, self.receiver);
171 })
172 }
173}