1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/*
#![allow(clippy::needless_pass_by_ref_mut)] // False positives
use std::{
sync::{OnceLock, Mutex},
time::Duration,
fs,
};
use zeroize::Zeroizing;
use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto};
use serai_client::primitives::NetworkId;
use messages::{CoordinatorMessage, ProcessorMessage};
use serai_message_queue::{Service, Metadata, client::MessageQueue};
use serai_client::Serai;
use dockertest::{
PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, StartPolicy, Composition,
DockerOperations,
};
#[cfg(test)]
mod tests;
static UNIQUE_ID: OnceLock<Mutex<u16>> = OnceLock::new();
pub fn coordinator_instance(
name: &str,
message_queue_key: <Ristretto as Ciphersuite>::F,
) -> Composition {
serai_docker_tests::build("coordinator".to_string());
Composition::with_image(
Image::with_repository("serai-dev-coordinator").pull_policy(PullPolicy::Never),
)
.with_env(
[
("MESSAGE_QUEUE_KEY".to_string(), hex::encode(message_queue_key.to_repr())),
("DB_PATH".to_string(), "./coordinator-db".to_string()),
("SERAI_KEY".to_string(), {
use serai_client::primitives::insecure_pair_from_name;
hex::encode(&insecure_pair_from_name(name).as_ref().secret.to_bytes()[.. 32])
}),
(
"RUST_LOG".to_string(),
"serai_coordinator=trace,".to_string() + "tributary_chain=trace," + "tendermint=trace",
),
]
.into(),
)
}
pub fn serai_composition(name: &str) -> Composition {
serai_docker_tests::build("serai".to_string());
let mut composition = Composition::with_image(
Image::with_repository("serai-dev-serai").pull_policy(PullPolicy::Never),
)
.with_cmd(vec![
"serai-node".to_string(),
"--unsafe-rpc-external".to_string(),
"--rpc-cors".to_string(),
"all".to_string(),
"--chain".to_string(),
"local".to_string(),
format!("--{}", name.to_lowercase()),
]);
composition.publish_all_ports();
composition
}
pub type Handles = (String, String, String);
pub fn coordinator_stack(name: &str) -> (Handles, <Ristretto as Ciphersuite>::F, Vec<Composition>) {
let serai_composition = serai_composition(name);
let (coord_key, message_queue_keys, message_queue_composition) =
serai_message_queue_tests::instance();
let coordinator_composition = coordinator_instance(name, coord_key);
// Give every item in this stack a unique ID
// Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits
let (first, unique_id) = {
let unique_id_mutex = UNIQUE_ID.get_or_init(|| Mutex::new(0));
let mut unique_id_lock = unique_id_mutex.lock().unwrap();
let first = *unique_id_lock == 0;
let unique_id = hex::encode(unique_id_lock.to_be_bytes());
*unique_id_lock += 1;
(first, unique_id)
};
let logs_path = [std::env::current_dir().unwrap().to_str().unwrap(), ".test-logs", "coordinator"]
.iter()
.collect::<std::path::PathBuf>();
if first {
let _ = fs::remove_dir_all(&logs_path);
fs::create_dir_all(&logs_path).expect("couldn't create logs directory");
assert!(
fs::read_dir(&logs_path).expect("couldn't read the logs folder").next().is_none(),
"logs folder wasn't empty, despite removing it at the start of the run",
);
}
let logs_path = logs_path.to_str().unwrap().to_string();
let mut compositions = vec![];
let mut handles = vec![];
for composition in [serai_composition, message_queue_composition, coordinator_composition] {
let name = format!("{}-{}", composition.handle(), &unique_id);
compositions.push(
composition
.with_start_policy(StartPolicy::Strict)
.with_container_name(name.clone())
.with_log_options(Some(LogOptions {
action: LogAction::ForwardToFile { path: logs_path.clone() },
policy: LogPolicy::Always,
source: LogSource::Both,
})),
);
handles.push(compositions.last().unwrap().handle());
}
let coordinator_composition = compositions.last_mut().unwrap();
coordinator_composition.inject_container_name(handles.remove(0), "SERAI_HOSTNAME");
coordinator_composition.inject_container_name(handles.remove(0), "MESSAGE_QUEUE_RPC");
(
(compositions[0].handle(), compositions[1].handle(), compositions[2].handle()),
message_queue_keys[&NetworkId::Bitcoin],
compositions,
)
}
pub struct Processor {
network: NetworkId,
serai_rpc: String,
#[allow(unused)]
message_queue_handle: String,
#[allow(unused)]
coordinator_handle: String,
next_send_id: u64,
next_recv_id: u64,
queue: MessageQueue,
}
impl Processor {
pub async fn new(
network: NetworkId,
ops: &DockerOperations,
handles: (String, String, String),
processor_key: <Ristretto as Ciphersuite>::F,
) -> Processor {
let message_queue_rpc = ops.handle(&handles.1).host_port(2287).unwrap();
let message_queue_rpc = format!("{}:{}", message_queue_rpc.0, message_queue_rpc.1);
// Sleep until the Substrate RPC starts
let serai_rpc = ops.handle(&handles.0).host_port(9944).unwrap();
let serai_rpc = format!("ws://{}:{}", serai_rpc.0, serai_rpc.1);
// Bound execution to 60 seconds
for _ in 0 .. 60 {
tokio::time::sleep(Duration::from_secs(1)).await;
let Ok(client) = Serai::new(&serai_rpc).await else { continue };
if client.get_latest_block_hash().await.is_err() {
continue;
}
break;
}
// The Serai RPC may or may not be started
// Assume it is and continue, so if it's a few seconds late, it's still within tolerance
Processor {
network,
serai_rpc,
message_queue_handle: handles.1,
coordinator_handle: handles.2,
next_send_id: 0,
next_recv_id: 0,
queue: MessageQueue::new(
Service::Processor(network),
message_queue_rpc,
Zeroizing::new(processor_key),
),
}
}
pub async fn serai(&self) -> Serai {
Serai::new(&self.serai_rpc).await.unwrap()
}
/// Send a message to a processor as its coordinator.
pub async fn send_message(&mut self, msg: impl Into<ProcessorMessage>) {
let msg: ProcessorMessage = msg.into();
self
.queue
.queue(
Metadata {
from: Service::Processor(self.network),
to: Service::Coordinator,
intent: msg.intent(),
},
serde_json::to_string(&msg).unwrap().into_bytes(),
)
.await;
self.next_send_id += 1;
}
/// Receive a message from a processor as its coordinator.
pub async fn recv_message(&mut self) -> CoordinatorMessage {
let msg = tokio::time::timeout(Duration::from_secs(10), self.queue.next(self.next_recv_id))
.await
.unwrap();
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, self.next_recv_id);
self.queue.ack(self.next_recv_id).await;
self.next_recv_id += 1;
serde_json::from_slice(&msg.msg).unwrap()
}
}
*/