1pub mod cleanup;
16pub mod exec;
17pub mod registration;
18mod util;
19
20use std::{
21 borrow::Cow,
22 collections::HashMap,
23 ffi::{OsStr, OsString},
24 io::Write,
25 path::PathBuf,
26 str::FromStr,
27};
28
29use anyhow::{anyhow, Context, Error};
30use bollard::{
31 auth::DockerCredentials,
32 container::{
33 Config as BollardContainerConfig, CreateContainerOptions, ListContainersOptions,
34 LogsOptions, StartContainerOptions, WaitContainerOptions,
35 },
36 exec::{CreateExecOptions, StartExecOptions},
37 models::{
38 EndpointSettings, HostConfig, HostConfigLogConfig, PortBinding, RestartPolicy,
39 RestartPolicyNameEnum,
40 },
41 network::{ConnectNetworkOptions, CreateNetworkOptions, ListNetworksOptions},
42 Docker,
43};
44use cleanup::{Cleanup, Disarm};
45use futures_util::stream::StreamExt;
46use itertools::Itertools;
47use lazy_static::lazy_static;
48use log::{debug, error, warn};
49use serde::{Deserialize, Serialize};
50use tokio::io::{AsyncWriteExt, BufWriter};
51use tokio_util::codec::{BytesCodec, FramedRead};
52use typed_builder::TypedBuilder;
53
54use registration::{handle_user_registration, User};
55
56use crate::{
57 exec::{CommandExt, Executor},
58 util::YamlExt,
59};
60
61lazy_static! {
62 static ref MX_TEST_MODULE_DIR: OsString = OsString::from_str("MX_TEST_MODULE_DIR").unwrap();
66
67 static ref MX_TEST_SYNAPSE_DIR: OsString = OsString::from_str("MX_TEST_SYNAPSE_DIR").unwrap();
71
72 static ref MX_TEST_SCRIPT_TMPDIR: OsString = OsString::from_str("MX_TEST_SCRIPT_TMPDIR").unwrap();
76
77 static ref MX_TEST_CWD: OsString = OsString::from_str("MX_TEST_CWD").unwrap();
81
82 static ref MX_TEST_WORKERS_ENABLED: OsString = OsString::from_str("MX_TEST_WORKERS_ENABLED").unwrap();
86
87 static ref MX_TEST_NETWORK_NAME: OsString = OsString::from_str("MX_TEST_NETWORK_NAME").unwrap();
91
92 static ref MX_TEST_SETUP_CONTAINER_NAME: OsString = OsString::from_str("MX_TEST_SETUP_CONTAINER_NAME").unwrap();
96
97 static ref MX_TEST_UP_RUN_DOWN_CONTAINER_NAME: OsString = OsString::from_str("MX_TEST_UP_RUN_DOWN_CONTAINER_NAME").unwrap();
101}
102
103const MEMORY_ALLOCATION_BYTES: i64 = 4 * 1024 * 1024 * 1024;
105
106const MAX_SYNAPSE_RESTART_COUNT: i64 = 20;
113
114const HARDCODED_GUEST_PORT: u64 = 8008;
119
120const HARDCODED_MAIN_PROCESS_HTTP_LISTENER_PORT: u64 = 8080;
123
124const TIMEOUT_USER_REGISTRATION_SIMPLE: std::time::Duration = std::time::Duration::new(120, 0);
125
126#[derive(Clone, Debug, Deserialize)]
128pub struct PortMapping {
129 pub host: u64,
131
132 pub guest: u64,
134}
135
136#[derive(Debug, Deserialize, TypedBuilder)]
138pub struct DockerConfig {
139 #[serde(default = "DockerConfig::default_hostname")]
142 #[builder(default = DockerConfig::default_hostname())]
143 pub hostname: String,
144
145 #[serde(default)]
150 #[builder(default = vec![])]
151 pub port_mapping: Vec<PortMapping>,
152}
153
154impl Default for DockerConfig {
155 fn default() -> DockerConfig {
156 Self::builder().build()
157 }
158}
159
160impl DockerConfig {
161 fn default_hostname() -> String {
162 "synapse".to_string()
163 }
164}
165
166#[derive(Debug, Deserialize, Serialize, TypedBuilder)]
170pub struct HomeserverConfig {
171 #[serde(default = "HomeserverConfig::host_port_default")]
173 #[builder(default = HomeserverConfig::host_port_default())]
174 pub host_port: u64,
175
176 #[serde(default = "HomeserverConfig::server_name_default")]
178 #[builder(default = HomeserverConfig::server_name_default())]
179 pub server_name: String,
180
181 #[serde(default = "HomeserverConfig::public_baseurl_default")]
183 #[builder(default = HomeserverConfig::public_baseurl_default())]
184 pub public_baseurl: String,
185
186 #[serde(default = "HomeserverConfig::registration_shared_secret_default")]
187 #[builder(default = HomeserverConfig::registration_shared_secret_default())]
188 pub registration_shared_secret: String,
190
191 #[serde(flatten)]
192 #[builder(default)]
193 pub extra_fields: HashMap<String, serde_yaml::Value>,
195}
196
197impl Default for HomeserverConfig {
198 fn default() -> HomeserverConfig {
199 Self::builder().build()
200 }
201}
202
203impl HomeserverConfig {
204 pub fn set_host_port(&mut self, port: u64) {
206 self.host_port = port;
207 self.server_name = format!("localhost:{}", port);
208 self.public_baseurl = format!("http://localhost:{}", port);
209 }
210 pub fn host_port_default() -> u64 {
211 9999
212 }
213 pub fn server_name_default() -> String {
214 "localhost:9999".to_string()
215 }
216 pub fn public_baseurl_default() -> String {
217 format!("http://{}", Self::server_name_default())
218 }
219 pub fn registration_shared_secret_default() -> String {
220 "MX_TESTER_REGISTRATION_DEFAULT".to_string()
221 }
222}
223
224#[derive(Debug, TypedBuilder, Deserialize)]
226pub struct WorkersConfig {
227 #[serde(default)]
228 #[builder(default = false)]
229 pub enabled: bool,
230}
231impl Default for WorkersConfig {
232 fn default() -> Self {
233 Self::builder().build()
234 }
235}
236
237#[derive(Debug, TypedBuilder, Deserialize)]
239pub struct Config {
240 pub name: String,
244
245 #[serde(default)]
247 #[builder(default)]
248 pub modules: Vec<ModuleConfig>,
249
250 #[serde(default)]
251 #[builder(default)]
252 pub homeserver: HomeserverConfig,
254
255 #[serde(default)]
256 #[builder(default)]
257 pub up: Option<UpScript>,
259
260 #[serde(default)]
261 #[builder(default)]
262 pub run: Option<Script>,
264
265 #[serde(default)]
266 #[builder(default)]
267 pub down: Option<DownScript>,
269
270 #[serde(default)]
271 #[builder(default)]
272 pub docker: DockerConfig,
274
275 #[serde(default)]
276 #[builder(default)]
277 pub users: Vec<User>,
279
280 #[serde(default)]
281 #[builder(default)]
282 pub synapse: SynapseVersion,
284
285 #[serde(default)]
286 #[builder(default)]
287 pub credentials: DockerCredentials,
291
292 #[serde(default)]
293 #[builder(default)]
294 pub directories: Directories,
298
299 #[serde(default)]
300 #[builder(default)]
301 pub workers: WorkersConfig,
305
306 #[serde(default = "util::true_")]
307 #[builder(default = true)]
308 pub autoclean_on_error: bool,
312}
313
314impl Config {
315 pub fn shared_env_variables(&self) -> Result<HashMap<&'static OsStr, OsString>, Error> {
321 let synapse_root = self.synapse_root();
322 let script_tmpdir = synapse_root.join("scripts");
323 std::fs::create_dir_all(&script_tmpdir)
324 .with_context(|| format!("Could not create directory {:#?}", script_tmpdir,))?;
325 let curdir = std::env::current_dir()?;
326 let env: HashMap<&'static OsStr, OsString> = std::iter::IntoIterator::into_iter([
327 (
328 MX_TEST_SYNAPSE_DIR.as_os_str(),
329 synapse_root.as_os_str().into(),
330 ),
331 (
332 MX_TEST_SCRIPT_TMPDIR.as_os_str(),
333 script_tmpdir.as_os_str().into(),
334 ),
335 (MX_TEST_CWD.as_os_str(), curdir.as_os_str().into()),
336 (MX_TEST_NETWORK_NAME.as_os_str(), self.network().into()),
337 (
338 MX_TEST_SETUP_CONTAINER_NAME.as_os_str(),
339 self.setup_container_name().into(),
340 ),
341 (
342 MX_TEST_UP_RUN_DOWN_CONTAINER_NAME.as_os_str(),
343 self.run_container_name().into(),
344 ),
345 ])
346 .chain(
347 if self.workers.enabled {
348 Some((MX_TEST_WORKERS_ENABLED.as_os_str(), "true".into()))
349 } else {
350 None
351 }
352 .into_iter(),
353 )
354 .collect();
355 Ok(env)
356 }
357
358 pub fn patch_homeserver_config(&self) -> Result<(), Error> {
363 use serde_yaml::Mapping;
364 let target_path = self.synapse_root().join("data").join("homeserver.yaml");
365 debug!("Attempting to open {:#?}", target_path);
366 let config_file = std::fs::File::open(&target_path)
367 .context("Could not open the homeserver.yaml generated by synapse")?;
368 let mut config: Mapping = serde_yaml::from_reader(config_file)
369 .context("The homeserver.yaml generated by synapse is invalid")?;
370 self.patch_homeserver_config_content(&mut config)?;
371 serde_yaml::to_writer(std::fs::File::create(&target_path)?, &config)
372 .context("Could not write combined homeserver config")?;
373 Ok(())
374 }
375 pub fn patch_homeserver_config_content(
376 &self,
377 config: &mut serde_yaml::Mapping,
378 ) -> Result<(), Error> {
379 use serde_yaml::Value as YAML;
380 const LISTENERS: &str = "listeners";
381 const MODULES: &str = "modules";
382 let combined_config = config;
383
384 for (key, value) in [
385 ("public_baseurl", &self.homeserver.public_baseurl),
386 ("server_name", &self.homeserver.server_name),
387 (
388 "registration_shared_secret",
389 &self.homeserver.registration_shared_secret,
390 ),
391 ] {
392 combined_config.insert(key.into(), value.to_string().into());
393 }
394 combined_config.insert(
395 "enable_registration_without_verification".into(),
396 true.into(),
397 );
398
399 for (key, value) in &self.homeserver.extra_fields {
402 combined_config.insert(YAML::from(key.clone()), value.clone());
403 }
404
405 let large_rate_limit: serde_yaml::Value = yaml!({
407 "per_second" => 1_000_000_000,
408 "burst_count" => 1_000_000_000,
409 });
410 for (key, rate_limit) in &[
411 ("rc_message", large_rate_limit.clone()),
412 ("rc_registration", large_rate_limit.clone()),
413 ("rc_admin_redaction", large_rate_limit.clone()),
414 (
415 "rc_login",
416 yaml!({
417 "address" => large_rate_limit.clone(),
418 "account" => large_rate_limit.clone(),
419 "failed_attempts" => large_rate_limit.clone(),
420 }),
421 ),
422 (
423 "rc_invites",
424 yaml!({
425 "per_room" => large_rate_limit.clone(),
426 "per_user" => large_rate_limit.clone(),
427 "per_sender" => large_rate_limit.clone(),
428 }),
429 ),
430 ] {
431 if !combined_config.contains_key(key) {
432 combined_config.insert(key.to_string().into(), rate_limit.clone());
434 } else if combined_config[key].is_default() {
435 combined_config.remove(key);
437 } else {
438 }
440 }
441
442 let listeners = combined_config
445 .entry(LISTENERS.into())
446 .or_insert_with(|| yaml!([]));
447 *listeners = yaml!([yaml!({
448 "port" => if self.workers.enabled { HARDCODED_MAIN_PROCESS_HTTP_LISTENER_PORT } else { HARDCODED_GUEST_PORT },
449 "tls" => false,
450 "type" => "http",
451 "bind_addresses" => yaml!(["::"]),
452 "x_forwarded" => false,
453 "resources" => yaml!([
454 yaml!({
455 "names" => yaml!(["client"]),
456 "compress" => true
457 }),
458 yaml!({
459 "names" => yaml!(["federation"]),
460 "compress" => false
461 })
462 ]),
463 })]);
464 if self.workers.enabled {
465 listeners
467 .as_sequence_mut()
468 .unwrap() .push(yaml!({
470 "port" => 9093,
471 "bind_address" => "127.0.0.1",
472 "type" => "http",
473 "resources" => yaml!([
474 yaml!({
475 "names" => yaml!(["replication"])
476 })
477 ])
478 }));
479 }
480
481 let modules_root = combined_config
483 .entry(MODULES.into())
484 .or_insert_with(|| yaml!([]))
485 .to_seq_mut()
486 .ok_or_else(|| anyhow!("In homeserver.yaml, expected a sequence for key `modules`"))?;
487 for module in &self.modules {
488 modules_root.push(module.config.clone());
489 }
490
491 if self.workers.enabled {
492 for (key, value) in std::iter::IntoIterator::into_iter([
493 (
495 "redis",
496 yaml!({
497 "enabled" => true,
498 }),
499 ),
500 (
502 "database",
503 yaml!({
504 "name" => "psycopg2",
505 "txn_limit" => 10_000,
506 "args" => yaml!({
507 "user" => "synapse",
508 "password" => "password",
509 "host" => "localhost",
510 "port" => 5432,
511 "cp_min" => 5,
512 "cp_max" => 10
513 })
514 }),
515 ),
516 ("notify_appservices", yaml!(false)),
519 ("send_federation", yaml!(false)),
520 ("update_user_directory", yaml!(false)),
521 ("start_pushers", yaml!(false)),
522 ("url_preview_enabled", yaml!(false)),
523 (
524 "url_preview_ip_range_blacklist",
525 yaml!(["255.255.255.255/32",]),
526 ),
527 ("suppress_key_server_warning", yaml!(true)),
529 ]) {
530 combined_config.insert(yaml!(key), value);
531 }
532
533 let conf_path = self.synapse_workers_dir().join("shared.yaml");
537 let conf_file = std::fs::File::open(&conf_path).with_context(|| {
538 format!("Could not open workers shared config: {:?}", conf_path)
539 })?;
540 let mut config: serde_yaml::Mapping = serde_yaml::from_reader(&conf_file)
541 .with_context(|| {
542 format!("Could not parse workers shared config: {:?}", conf_path)
543 })?;
544
545 let modules_root = config
546 .entry(MODULES.into())
547 .or_insert_with(|| yaml!([]))
548 .to_seq_mut()
549 .ok_or_else(|| anyhow!("In shared.yaml, expected a sequence for key `modules`"))?;
550 for module in &self.modules {
551 modules_root.push(module.config.clone());
552 }
553
554 for (key, value) in std::iter::IntoIterator::into_iter([
555 ("url_preview_enabled", yaml!(false)),
557 (
558 "url_preview_ip_range_blacklist",
559 yaml!(["255.255.255.255/32"]),
560 ),
561 (
563 "database",
564 yaml!({
565 "name" => "psycopg2",
566 "txn_limit" => 10_000,
567 "args" => yaml!({
568 "user" => "synapse",
569 "password" => "password",
570 "host" => "localhost",
571 "port" => 5432,
572 "cp_min" => 5,
573 "cp_max" => 10
574 })
575 }),
576 ),
577 ]) {
578 config.insert(yaml!(key), value);
579 }
580
581 serde_yaml::to_writer(std::fs::File::create(&conf_path)?, &combined_config)
583 .context("Could not write workers shared config")?;
584 }
585
586 Ok(())
587 }
588
589 pub fn test_root(&self) -> PathBuf {
593 self.directories.root.join(&self.name)
594 }
595
596 pub fn synapse_root(&self) -> PathBuf {
598 self.test_root().join("synapse")
599 }
600
601 pub fn synapse_data_dir(&self) -> PathBuf {
603 self.synapse_root().join("data")
604 }
605
606 pub fn synapse_workers_dir(&self) -> PathBuf {
608 self.synapse_root().join("workers")
609 }
610
611 pub fn etc_dir(&self) -> PathBuf {
614 self.test_root().join("etc")
615 }
616
617 pub fn logs_dir(&self) -> PathBuf {
619 self.test_root().join("logs")
620 }
621
622 pub fn scripts_logs_dir(&self) -> PathBuf {
623 self.logs_dir().join("mx-tester")
624 }
625
626 pub fn tag(&self) -> String {
628 match self.synapse {
629 SynapseVersion::Docker { ref tag } => {
630 format!(
631 "mx-tester-synapse-{}-{}{workers}",
632 tag,
633 self.name,
634 workers = if self.workers.enabled { "-workers" } else { "" }
635 )
636 }
637 }
638 }
639
640 pub fn network(&self) -> String {
642 format!("net-{}", self.tag())
643 }
644
645 pub fn setup_container_name(&self) -> String {
647 format!(
648 "mx-tester-synapse-setup-{}{}",
649 self.name,
650 if self.workers.enabled { "-workers" } else { "" }
651 )
652 }
653
654 pub fn run_container_name(&self) -> String {
656 format!(
657 "mx-tester-synapse-run-{}{}",
658 self.name,
659 if self.workers.enabled { "-workers" } else { "" }
660 )
661 }
662}
663
664#[derive(Debug, TypedBuilder, Deserialize)]
666pub struct Directories {
667 #[builder(default=std::env::temp_dir().join("mx-tester"))]
673 pub root: PathBuf,
674}
675impl Default for Directories {
676 fn default() -> Self {
677 Directories::builder().build()
678 }
679}
680
681pub enum Status {
683 Success,
685
686 Failure,
688
689 Manual,
691}
692
693const DEFAULT_SYNAPSE_VERSION: &str = "matrixdotorg/synapse:latest";
695
696#[derive(Debug, Deserialize)]
697pub enum SynapseVersion {
698 #[serde(rename = "docker")]
699 Docker { tag: String },
700 }
703impl Default for SynapseVersion {
704 fn default() -> Self {
705 Self::Docker {
706 tag: DEFAULT_SYNAPSE_VERSION.to_string(),
707 }
708 }
709}
710
711#[derive(Debug, Deserialize)]
712#[serde(transparent)]
713pub struct Script {
714 lines: Vec<String>,
721}
722impl Script {
723 pub async fn run(
724 &self,
725 stage: &'static str,
726 log_dir: &PathBuf,
727 env: &HashMap<&'static OsStr, OsString>,
728 ) -> Result<(), Error> {
729 debug!("Running with environment variables {:#?}", env);
730 println!(
731 "** running {} script. See stdout and stderr captures in {:?}",
732 stage,
733 log_dir.join(stage)
734 );
735 let _ = std::fs::remove_dir(log_dir.join(stage).as_path().with_extension("log"));
736 let _ = std::fs::remove_dir(log_dir.join(stage).as_path().with_extension("out"));
737 let executor = Executor::try_new().context("Cannot instantiate executor")?;
738 for line in &self.lines {
739 println!("*** {}", line);
740 let mut command = executor
741 .command(line)
742 .with_context(|| format!("Could not interpret `{}` as shell script", line))?;
743 command.envs(env);
744 debug!("Running command {:?}", command);
745 command
746 .spawn_logged(log_dir, stage, line)
747 .await
748 .with_context(|| format!("Error within line {line}", line = line))?;
749 }
750 println!("** running {} script success", stage);
751 Ok(())
752 }
753}
754
755#[derive(Debug, Deserialize)]
757pub struct ModuleConfig {
758 name: String,
762
763 build: Script,
768
769 #[serde(default)]
773 install: Option<Script>,
774
775 #[serde(default)]
777 env: HashMap<String, String>,
778
779 #[serde(default)]
783 copy: HashMap<String, String>,
784
785 config: serde_yaml::Value,
795}
796
797#[derive(Debug, Deserialize)]
799#[serde(untagged)]
800pub enum UpScript {
801 FullUpScript(FullUpScript),
803
804 SimpleScript(Script),
806}
807impl Default for UpScript {
808 fn default() -> Self {
809 UpScript::FullUpScript(FullUpScript::default())
810 }
811}
812
813#[derive(Debug, Deserialize, Default)]
815pub struct FullUpScript {
816 before: Option<Script>,
818
819 after: Option<Script>,
821}
822
823#[derive(Debug, Deserialize)]
825pub struct DownScript {
826 success: Option<Script>,
828
829 failure: Option<Script>,
831
832 finally: Option<Script>,
836}
837
838async fn start_synapse_container(
843 docker: &Docker,
844 config: &Config,
845 container_name: &str,
846 cmd: Vec<String>,
847 detach: bool,
848) -> Result<(), Error> {
849 let data_dir = config.synapse_data_dir();
850 let data_dir = data_dir.as_path();
851
852 let mut env = vec![
853 format!("SYNAPSE_SERVER_NAME={}", config.homeserver.server_name),
854 "SYNAPSE_REPORT_STATS=no".into(),
855 "SYNAPSE_CONFIG_DIR=/data".into(),
856 format!(
857 "SYNAPSE_HTTP_PORT={}",
858 if config.workers.enabled {
859 HARDCODED_MAIN_PROCESS_HTTP_LISTENER_PORT
860 } else {
861 HARDCODED_GUEST_PORT
862 }
863 ),
864 ];
865 if config.workers.enabled {
866 env.push("SYNAPSE_WORKER_TYPES=event_persister, event_persister, background_worker, frontend_proxy, event_creator, user_dir, media_repository, federation_inbound, federation_reader, federation_sender, synchrotron, appservice, pusher".to_string());
870 env.push("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK=1".to_string());
871 }
872 let env = env;
873 debug!("We need to create container for {}", container_name);
874
875 let mut host_port_bindings = HashMap::new();
877 let mut exposed_ports = HashMap::new();
878 for mapping in config.docker.port_mapping.iter().chain(
879 [PortMapping {
880 host: config.homeserver.host_port,
881 guest: HARDCODED_GUEST_PORT,
882 }]
883 .iter(),
884 ) {
885 let key = format!("{}/tcp", mapping.guest);
886 host_port_bindings.insert(
887 key.clone(),
888 Some(vec![PortBinding {
889 host_port: Some(format!("{}", mapping.host)),
890 ..PortBinding::default()
891 }]),
892 );
893 exposed_ports.insert(key.clone(), HashMap::new());
894 }
895 debug!("port_bindings: {:#?}", host_port_bindings);
896
897 debug!("Creating container {}", container_name);
898 let response = docker
899 .create_container(
900 Some(CreateContainerOptions {
901 name: container_name,
902 }),
903 BollardContainerConfig {
904 env: Some(env.clone()),
905 exposed_ports: Some(exposed_ports),
906 hostname: Some(config.docker.hostname.clone()),
907 host_config: Some(HostConfig {
908 log_config: Some(HostConfigLogConfig {
909 typ: Some("json-file".to_string()),
910 config: None,
911 }),
912 restart_policy: Some(RestartPolicy {
916 name: Some(RestartPolicyNameEnum::ON_FAILURE),
917 maximum_retry_count: Some(MAX_SYNAPSE_RESTART_COUNT),
918 }),
919 memory_reservation: Some(MEMORY_ALLOCATION_BYTES),
921 memory_swap: Some(-1),
922 binds: Some(vec![
924 format!("{}:/data:rw", data_dir.as_os_str().to_string_lossy()),
926 format!(
928 "{}:/conf/workers:rw",
929 config.synapse_workers_dir().to_string_lossy()
930 ),
931 format!(
932 "{}:/etc/nginx/conf.d:rw",
933 config.etc_dir().join("nginx").to_string_lossy()
934 ),
935 format!(
936 "{}:/etc/supervisor/conf.d:rw",
937 config.etc_dir().join("supervisor").to_string_lossy()
938 ),
939 format!(
940 "{}:/var/log/nginx:rw",
941 config.logs_dir().join("nginx").to_string_lossy()
942 ),
943 format!(
944 "{}:/var/log/workers:rw",
945 config.logs_dir().join("workers").to_string_lossy()
946 ),
947 ]),
948 port_bindings: Some(host_port_bindings),
950 #[cfg(target_os = "linux")]
954 extra_hosts: Some(vec!["host.docker.internal:host-gateway".to_string()]),
955 ..HostConfig::default()
956 }),
957 image: Some(config.tag()),
958 attach_stderr: Some(true),
959 attach_stdout: Some(true),
960 attach_stdin: Some(false),
961 cmd: Some(cmd.clone()),
962 volumes: Some(
965 vec![
966 ("/data".to_string(), HashMap::new()),
967 ("/conf/workers".to_string(), HashMap::new()),
968 ("/etc/nginx/conf.d".to_string(), HashMap::new()),
969 ("/etc/supervisor/conf.d".to_string(), HashMap::new()),
970 ("/var/log/workers".to_string(), HashMap::new()),
971 ]
972 .into_iter()
973 .collect(),
974 ),
975 tty: Some(false),
976 #[cfg(unix)]
977 user: Some(format!("{}", nix::unistd::getuid())),
978 ..BollardContainerConfig::default()
979 },
980 )
981 .await
982 .context("Failed to build container")?;
983
984 let mut wait = docker.wait_container(
986 container_name,
987 Some(WaitContainerOptions {
988 condition: "not-running",
989 }),
990 );
991 {
992 let container_name = container_name.to_string();
993 tokio::task::spawn(async move {
994 debug!(target: "mx-tester-wait", "{} Container started", container_name);
995 while let Some(next) = wait.next().await {
996 let response = next.context("Error while waiting for container to stop")?;
997 debug!(target: "mx-tester-wait", "{} {:#?}", container_name, response);
998 }
999 debug!(target: "mx-tester-wait", "{} Container is now down", container_name);
1000 Ok::<(), Error>(())
1001 });
1002 }
1003
1004 for warning in response.warnings {
1005 warn!(target: "creating-container", "{}", warning);
1006 }
1007
1008 docker
1010 .connect_network(
1011 config.network().as_ref(),
1012 ConnectNetworkOptions {
1013 container: container_name,
1014 endpoint_config: EndpointSettings::default(),
1015 },
1016 )
1017 .await
1018 .context("Failed to connect container")?;
1019
1020 let is_container_running = docker.is_container_running(container_name).await?;
1021 if !is_container_running {
1022 docker
1023 .start_container(container_name, None::<StartContainerOptions<String>>)
1024 .await
1025 .context("Failed to start container")?;
1026 let mut logs = docker.logs(
1027 container_name,
1028 Some(LogsOptions {
1029 follow: true,
1030 stdout: true,
1031 stderr: true,
1032 tail: "10",
1033 ..LogsOptions::default()
1034 }),
1035 );
1036
1037 let log_file = tokio::fs::OpenOptions::new()
1039 .create(true)
1040 .append(true)
1041 .open(config.logs_dir().join("docker").join(format!(
1042 "{}.log",
1043 if detach { "up-run-down" } else { "build" }
1044 )))
1045 .await?;
1046 let mut buffer = BufWriter::new(log_file);
1047 tokio::task::spawn(async move {
1048 debug!(target: "mx-tester-log", "Starting log watcher");
1049 while let Some(next) = logs.next().await {
1050 match next {
1051 Ok(content) => {
1052 debug!(target: "mx-tester-log", "{}", content);
1053 buffer.write_all(format!("{}", content).as_bytes()).await?;
1054 buffer.flush().await?;
1055 }
1056 Err(err) => {
1057 error!(target: "mx-tester-log", "{}", err);
1058 buffer
1059 .write_all(format!("ERROR: {}", err).as_bytes())
1060 .await?;
1061 buffer.flush().await?;
1062 return Err(err).context("Error in log");
1063 }
1064 }
1065 }
1066 debug!(target: "mx-tester-log", "Stopped log watcher");
1067 Ok(())
1068 });
1069 }
1070
1071 let cleanup = if config.autoclean_on_error {
1072 Some(Cleanup::new(config))
1073 } else {
1074 None
1075 };
1076 let exec = docker
1077 .create_exec(
1078 container_name,
1079 CreateExecOptions::<Cow<'_, str>> {
1080 cmd: Some(cmd.into_iter().map(|s| s.into()).collect()),
1081 env: Some(env.into_iter().map(|s| s.into()).collect()),
1082 #[cfg(unix)]
1083 user: Some(format!("{}", nix::unistd::getuid()).into()),
1084 ..CreateExecOptions::default()
1085 },
1086 )
1087 .await
1088 .context("Error while preparing to Synapse container")?;
1089 let execution = docker
1090 .start_exec(
1091 &exec.id,
1092 Some(StartExecOptions {
1093 detach,
1094 ..StartExecOptions::default()
1095 }),
1096 )
1097 .await
1098 .context("Error starting Synapse container")?;
1099
1100 if !detach {
1101 let log_file = tokio::fs::OpenOptions::new()
1102 .create(true)
1103 .append(true)
1104 .open(config.logs_dir().join("docker").join(format!(
1105 "{}.out",
1106 if detach { "up-run-down" } else { "build" }
1107 )))
1108 .await?;
1109 let mut buffer = BufWriter::new(log_file);
1110 tokio::task::spawn(async move {
1111 debug!(target: "synapse", "Launching Synapse container");
1112 match execution {
1113 bollard::exec::StartExecResults::Attached {
1114 mut output,
1115 input: _,
1116 } => {
1117 while let Some(data) = output.next().await {
1118 let output = data.context("Error during run")?;
1119 debug!(target: "synapse", "{}", output);
1120 buffer.write_all(format!("{}", output).as_bytes()).await?;
1121 buffer.flush().await?;
1122 }
1123 }
1124 bollard::exec::StartExecResults::Detached => panic!(),
1125 }
1126 debug!(target: "synapse", "Synapse container finished");
1127 Ok::<(), Error>(())
1128 })
1129 .await??;
1130 }
1131 cleanup.disarm();
1132 Ok(())
1133}
1134
1135pub async fn build(docker: &Docker, config: &Config) -> Result<(), Error> {
1137 let SynapseVersion::Docker {
1139 tag: ref docker_tag,
1140 } = config.synapse;
1141 let setup_container_name = config.setup_container_name();
1142 let run_container_name = config.run_container_name();
1143
1144 println!("\n* build step: starting");
1145
1146 let _ = docker.stop_container(&run_container_name, None).await;
1148 let _ = docker.remove_container(&run_container_name, None).await;
1149 let _ = docker.stop_container(&setup_container_name, None).await;
1150 let _ = docker.remove_container(&setup_container_name, None).await;
1151 let _ = docker.remove_image(config.tag().as_ref(), None, None).await;
1152
1153 let synapse_root = config.synapse_root();
1154 let _ = std::fs::remove_dir_all(config.test_root());
1155 let modules_log_dir = config.scripts_logs_dir().join("modules");
1156 for dir in &[
1157 &config.synapse_data_dir(),
1158 &config.synapse_workers_dir(),
1159 &config.etc_dir().join("nginx"),
1160 &config.etc_dir().join("supervisor"),
1161 &config.logs_dir().join("docker"),
1162 &config.logs_dir().join("nginx"),
1163 &config.logs_dir().join("workers"),
1164 &modules_log_dir,
1165 ] {
1166 std::fs::create_dir_all(&dir)
1167 .with_context(|| format!("Could not create directory {:#?}", dir,))?;
1168 }
1169
1170 println!("** building modules");
1172 let mut env = config.shared_env_variables()?;
1173
1174 for module in &config.modules {
1175 let path = synapse_root.join(&module.name);
1176 env.insert(&*MX_TEST_MODULE_DIR, path.as_os_str().into());
1177 debug!(
1178 "Calling build script for module {} with MX_TEST_DIR={:#?}",
1179 &module.name, path
1180 );
1181 let log_dir = modules_log_dir.join(&module.name);
1182 std::fs::create_dir_all(&log_dir)
1183 .with_context(|| format!("Could not create directory {:#?}", log_dir,))?;
1184 module
1185 .build
1186 .run("build", &log_dir, &env)
1187 .await
1188 .context("Error running build script")?;
1189 debug!("Completed one module.");
1190 }
1191 println!("** building modules success");
1192
1193 if config.workers.enabled {
1195 let conf_dir = synapse_root.join("conf");
1196 std::fs::create_dir_all(&conf_dir)
1197 .context("Could not create directory for worker configuration file")?;
1198 let data = [
1199 (
1203 conf_dir.join("worker.yaml.j2"),
1204 include_str!("../res/workers/worker.yaml.j2"),
1205 ),
1206 (
1207 conf_dir.join("shared.yaml.j2"),
1208 include_str!("../res/workers/shared.yaml.j2"),
1209 ),
1210 (
1211 conf_dir.join("supervisord.conf.j2"),
1212 include_str!("../res/workers/supervisord.conf.j2"),
1213 ),
1214 (
1215 conf_dir.join("nginx.conf.j2"),
1216 include_str!("../res/workers/nginx.conf.j2"),
1217 ),
1218 (
1219 conf_dir.join("log.config"),
1220 include_str!("../res/workers/log.config"),
1221 ),
1222 (
1224 synapse_root.join("workers_start.py"),
1225 include_str!("../res/workers/workers_start.py"),
1226 ),
1227 (
1229 conf_dir.join("postgres.sql"),
1230 include_str!("../res/workers/postgres.sql"),
1231 ),
1232 ];
1233 for (path, content) in &data {
1234 std::fs::write(&path, content).with_context(|| {
1235 format!("Could not inject worker configuration file {:?}", path)
1236 })?;
1237 }
1238 }
1239
1240 let dockerfile_content = format!("
1242# A custom Dockerfile to rebuild synapse from the official release + plugins
1243
1244FROM {docker_tag}
1245
1246VOLUME [\"/data\", \"/conf/workers\", \"/etc/nginx/conf.d\", \"/etc/supervisor/conf.d\", \"/var/log/workers\"]
1247
1248# We're not running as root, to avoid messing up with the host
1249# filesystem, so we need a proper user. We give it the current
1250# use's uid to make sure that files written by this Docker image
1251# can be read and removed by the host's user.
1252# Note that we need tty to workaround the following Docker issue:
1253# https://github.com/moby/moby/issues/31243#issuecomment-406825071
1254RUN useradd mx-tester {maybe_uid} --groups sudo,tty
1255
1256# Add a password, to be able to run sudo. We'll use it to
1257# chmod files.
1258RUN echo \"mx-tester:password\" | chpasswd
1259
1260# Show the Synapse version, to aid with debugging.
1261RUN pip show matrix-synapse
1262
1263{maybe_setup_workers}
1264
1265# Copy and install custom modules.
1266RUN mkdir /mx-tester
1267{setup}
1268{env}
1269{copy_modules}
1270{copy_resources}
1271{install}
1272
1273ENTRYPOINT []
1274
1275EXPOSE {synapse_http_port}/tcp 8009/tcp 8448/tcp
1276",
1277 docker_tag = docker_tag,
1278 setup = config.modules.iter()
1280 .filter_map(|module| module.install.as_ref().map(|script| format!("## Setup {}\n{}\n", module.name, script.lines.iter().map(|line| format!("RUN {}", line)).format("\n"))))
1281 .format("\n"),
1282 env = config.modules.iter()
1284 .map(|module| module.env.iter()
1285 .map(|(key, value)| format!("ENV {}={}\n", key, value))
1286 .format("")
1287 ).format(""),
1288 copy_modules = config.modules.iter()
1289 .map(|module| format!("COPY {module} /mx-tester/{module}", module=module.name))
1291 .format("\n"),
1292 copy_resources = config.modules.iter()
1294 .map(|module| module.copy.iter()
1295 .map(move |(dest, source)| format!("COPY {source} /mx-tester/{module}/{dest}\n",
1296 dest = dest,
1297 source = source,
1298 module = module.name,
1299 ))
1300 .format("")
1301 ).format(""),
1302 install = config.modules.iter()
1304 .map(|module| format!("RUN /usr/local/bin/python -m pip install /mx-tester/{module}", module=module.name))
1306 .format("\n"),
1307 maybe_uid = {
1309 let my_uid = nix::unistd::getuid();
1310 if my_uid != nix::unistd::ROOT {
1311 Cow::from(format!("--uid {}", my_uid))
1315 } else {
1316 Cow::from("")
1323 }
1324 },
1325 synapse_http_port = HARDCODED_GUEST_PORT,
1326 maybe_setup_workers =
1327 if config.workers.enabled {
1328"
1329# Install dependencies
1330RUN apt-get update && apt-get install -y postgresql postgresql-client-13 supervisor redis nginx sudo lsof
1331
1332# For workers, we're not using start.py but workers_start.py
1333# (which does call start.py, but that's a long story).
1334COPY workers_start.py /workers_start.py
1335COPY conf/* /conf/
1336
1337# We're not going to be running workers_start.py as root, so
1338# let's make sure that it *can* run, write to /etc/nginx & co.
1339RUN chmod ugo+rx /workers_start.py && chown mx-tester /workers_start.py
1340"
1341 } else {
1342 ""
1343 }
1344 );
1345 debug!("dockerfile {}", dockerfile_content);
1346
1347 let dockerfile_path = synapse_root.join("Dockerfile");
1348 std::fs::write(&dockerfile_path, dockerfile_content)
1349 .with_context(|| format!("Could not write file {:#?}", dockerfile_path,))?;
1350
1351 debug!("Building tar file");
1352 let docker_dir_path = config.test_root().join("tar");
1353 std::fs::create_dir_all(&docker_dir_path)
1354 .with_context(|| format!("Could not create directory {:#?}", docker_dir_path,))?;
1355 let body = {
1356 let tar_path = docker_dir_path.join("docker.tar");
1358 {
1359 let tar_file = std::fs::File::create(&tar_path)?;
1360 let mut tar_builder = tar::Builder::new(std::io::BufWriter::new(tar_file));
1361 debug!("tar: adding directory {:#?}", synapse_root);
1362 tar_builder
1363 .append_dir_all("", &synapse_root)
1364 .with_context(|| format!("Error while creating tar for {:#?}", &synapse_root))?;
1365 tar_builder
1366 .finish()
1367 .with_context(|| format!("Error finalizing tar for {:#?}", &synapse_root))?
1368 }
1369
1370 let tar_file = tokio::fs::File::open(&tar_path).await?;
1371 let stream = FramedRead::new(tar_file, BytesCodec::new());
1372 hyper::Body::wrap_stream(stream)
1373 };
1374 let logs_path = config.logs_dir().join("docker").join("build.log");
1375 println!(
1376 "** building Docker image. Logs will be stored at {:?}",
1377 logs_path
1378 );
1379 debug!("Building image with tag {}", config.tag());
1380 {
1381 let mut log =
1382 std::fs::File::create(logs_path).context("Could not create docker build logs")?;
1383 let mut stream = docker.build_image(
1384 bollard::image::BuildImageOptions {
1385 pull: true,
1386 nocache: true,
1387 t: config.tag(),
1388 q: false,
1389 rm: true,
1390 ..Default::default()
1391 },
1392 config.credentials.serveraddress.as_ref().map(|server| {
1393 let mut credentials = HashMap::new();
1394 credentials.insert(server.clone(), config.credentials.clone());
1395 credentials
1396 }),
1397 Some(body),
1398 );
1399 while let Some(result) = stream.next().await {
1400 let info = result.context("Daemon `docker build` indicated an error")?;
1401 if let Some(ref error) = info.error {
1402 return Err(anyhow!("Error while building an image: {}", error,));
1403 }
1404 if let Some(ref progress) = info.progress {
1405 debug!("Build image progress {:#?}", info);
1406 log.write_all(progress.as_bytes())
1407 .context("Could not write docker build logs")?;
1408 }
1409 }
1410 }
1411 debug!("Image built");
1412 println!("** building Docker image success");
1413
1414 println!("* build step: success");
1415 Ok(())
1416}
1417
1418pub async fn up(docker: &Docker, config: &Config) -> Result<(), Error> {
1420 let SynapseVersion::Docker { .. } = config.synapse;
1422 let cleanup = if config.autoclean_on_error {
1423 Some(Cleanup::new(config))
1424 } else {
1425 None
1426 };
1427
1428 println!("\n* up step: starting");
1429 let network_name = config.network();
1432 debug!("We'll need network {}", network_name);
1433 if !docker.is_network_up(&network_name).await? {
1434 debug!("Creating network {}", network_name);
1435 docker
1436 .create_network(CreateNetworkOptions {
1437 name: Cow::from(network_name.as_str()),
1438 check_duplicate: true,
1439 attachable: true,
1440 ..CreateNetworkOptions::default()
1441 })
1442 .await?;
1443 assert!(
1444 docker.is_network_up(&network_name).await?,
1445 "The network should now be up"
1446 );
1447 debug!("Network is now up");
1448 } else {
1449 debug!("Network {} already exists", network_name);
1453 }
1454
1455 let script_log_dir = config.scripts_logs_dir();
1459 match config.up {
1460 Some(UpScript::FullUpScript(FullUpScript {
1461 before: Some(ref script),
1462 ..
1463 }))
1464 | Some(UpScript::SimpleScript(ref script)) => {
1465 let env = config.shared_env_variables()?;
1466 script
1467 .run("up", &script_log_dir, &env)
1468 .await
1469 .context("Error running `up` script (before)")?;
1470 }
1471 _ => {}
1472 }
1473
1474 let setup_container_name = config.setup_container_name();
1475 let run_container_name = config.run_container_name();
1476
1477 let synapse_data_directory = config.synapse_data_dir();
1480 std::fs::create_dir_all(&synapse_data_directory)
1481 .with_context(|| format!("Cannot create directory {:#?}", synapse_data_directory))?;
1482
1483 let homeserver_path = synapse_data_directory.join("homeserver.yaml");
1485 let _ = std::fs::remove_file(&homeserver_path);
1486
1487 start_synapse_container(
1489 docker,
1490 config,
1491 &setup_container_name,
1492 if config.workers.enabled {
1493 vec!["/workers_start.py".to_string(), "generate".to_string()]
1494 } else {
1495 vec!["/start.py".to_string(), "generate".to_string()]
1496 },
1497 false,
1498 )
1499 .await
1500 .context("Couldn't generate homeserver.yaml")?;
1501
1502 debug!("done generating");
1508 let _ = docker.stop_container(&setup_container_name, None).await;
1509 let _ = docker.remove_container(&setup_container_name, None).await;
1510 docker.wait_container_removed(&setup_container_name).await?;
1511
1512 debug!("Updating homeserver.yaml");
1513 config
1515 .patch_homeserver_config()
1516 .context("Error updating homeserver config")?;
1517
1518 while docker.is_container_running(&setup_container_name).await? {
1522 debug!(
1523 "Waiting until docker container {} is down before relaunching it",
1524 setup_container_name
1525 );
1526 tokio::time::sleep(std::time::Duration::new(5, 0)).await;
1527 }
1528
1529 println!(
1530 "** starting Synapse. Logs will be stored at {:?}",
1531 config.logs_dir().join("docker").join("up-run-down.log")
1532 );
1533 start_synapse_container(
1534 docker,
1535 config,
1536 &run_container_name,
1537 if config.workers.enabled {
1538 vec!["/workers_start.py".to_string(), "start".to_string()]
1539 } else {
1540 vec!["/start.py".to_string()]
1541 },
1542 true,
1543 )
1544 .await
1545 .context("Failed to start Synapse")?;
1546
1547 debug!("Synapse should now be launched and ready");
1548
1549 let registration = async {
1556 handle_user_registration(config)
1557 .await
1558 .context("Failed to setup users")
1559 };
1560
1561 if config.workers.enabled {
1562 registration.await?;
1564 } else {
1565 match tokio::time::timeout(TIMEOUT_USER_REGISTRATION_SIMPLE, registration).await {
1566 Err(_) => {
1567 panic!(
1569 "User registration is taking too long. {is_running}",
1570 is_running = if docker.is_container_running(&run_container_name).await? {
1571 "Container is running, so this is usually an error in Synapse or modules."
1572 } else {
1573 "For some reason, the Docker image has stopped."
1574 },
1575 );
1576 }
1577 Ok(result) => result,
1578 }?
1579 };
1580 if let Some(UpScript::FullUpScript(FullUpScript {
1581 after: Some(ref script),
1582 ..
1583 })) = config.up
1584 {
1585 let env = config.shared_env_variables()?;
1586 script
1587 .run("up", &script_log_dir, &env)
1588 .await
1589 .context("Error running `up` script (after)")?;
1590 }
1591
1592 cleanup.disarm();
1593
1594 println!("* up step: success");
1595 Ok(())
1596}
1597
1598pub async fn down(docker: &Docker, config: &Config, status: Status) -> Result<(), Error> {
1600 let SynapseVersion::Docker { .. } = config.synapse;
1602 let run_container_name = config.run_container_name();
1603
1604 println!("\n* down step: starting");
1605
1606 let script_log_dir = config.scripts_logs_dir();
1609 let script_result = if let Some(ref down_script) = config.down {
1610 let env = config.shared_env_variables()?;
1611 let result = match (status, down_script) {
1614 (
1615 Status::Failure,
1616 DownScript {
1617 failure: Some(ref on_failure),
1618 ..
1619 },
1620 ) => on_failure
1621 .run("on_failure", &script_log_dir, &env)
1622 .await
1623 .context("Error while running script `down/failure`"),
1624 (
1625 Status::Success,
1626 DownScript {
1627 success: Some(ref on_success),
1628 ..
1629 },
1630 ) => on_success
1631 .run("on_success", &script_log_dir, &env)
1632 .await
1633 .context("Error while running script `down/success`"),
1634 _ => Ok(()),
1635 };
1636 if let Some(ref on_always) = down_script.finally {
1638 result.and(
1639 on_always
1640 .run("on_always", &script_log_dir, &env)
1641 .await
1642 .context("Error while running script `down/finally`"),
1643 )
1644 } else {
1645 result
1646 }
1647 } else {
1648 Ok(())
1649 };
1650
1651 debug!(target: "mx-tester-down", "Taking down synapse.");
1652 let stop_container_result = match docker.stop_container(&run_container_name, None).await {
1653 Err(bollard::errors::Error::DockerResponseServerError {
1654 message,
1655 status_code,
1656 }) if status_code >= 200 && status_code < 300 => {
1657 debug!(target: "mx-tester-down", "Synapse container stopped: {}", message);
1658 Ok(())
1659 }
1660 Err(bollard::errors::Error::DockerResponseServerError {
1661 message,
1662 status_code,
1663 }) if status_code == 304 => {
1664 debug!(target: "mx-tester-down", "Synapse container was already down: {}", message);
1665 Ok(())
1666 }
1667 Err(bollard::errors::Error::DockerResponseServerError {
1668 message,
1669 status_code,
1670 }) if status_code == 404 => {
1671 debug!(target: "mx-tester-down", "Synapse container not found for stopping: {}", message);
1672 Ok(())
1673 }
1674 Err(err) => Err(err).context("Error stopping container"),
1675 Ok(_) => {
1676 debug!(target: "mx-tester-down", "Synapse container stopped");
1677 Ok(())
1678 }
1679 };
1680
1681 let remove_container_result = match docker.remove_container(&run_container_name, None).await {
1682 Err(bollard::errors::Error::DockerResponseServerError {
1683 message,
1684 status_code,
1685 }) if status_code >= 200 && status_code < 300 => {
1686 debug!(target: "mx-tester-down", "Synapse container removed: {}", message);
1687 Ok(())
1688 }
1689 Err(bollard::errors::Error::DockerResponseServerError {
1690 message,
1691 status_code,
1692 }) if status_code == 304 => {
1693 debug!(target: "mx-tester-down", "Synapse container was already removed: {}", message);
1694 Ok(())
1695 }
1696 Err(bollard::errors::Error::DockerResponseServerError {
1697 message,
1698 status_code,
1699 }) if status_code == 404 => {
1700 debug!(target: "mx-tester-down", "Synapse container not found for removing: {}", message);
1701 Ok(())
1702 }
1703 Err(err) => Err(err).context("Error removing container"),
1704 Ok(_) => {
1705 debug!(target: "mx-tester-down", "Synapse container removed");
1706 Ok(())
1707 }
1708 };
1709
1710 debug!(target: "mx-tester-down", "Taking down network.");
1711 let remove_network_result = match docker.remove_network(config.network().as_ref()).await {
1712 Err(bollard::errors::Error::DockerResponseServerError {
1713 message,
1714 status_code,
1715 }) if status_code >= 200 && status_code < 300 => {
1716 debug!(target: "mx-tester-down", "Network removed: {}", message);
1717 Ok(())
1718 }
1719 Err(bollard::errors::Error::DockerResponseServerError {
1720 message,
1721 status_code,
1722 }) if status_code == 304 => {
1723 debug!(target: "mx-tester-down", "Network was already removed: {}", message);
1724 Ok(())
1725 }
1726 Err(bollard::errors::Error::DockerResponseServerError {
1727 message,
1728 status_code,
1729 }) if status_code == 404 => {
1730 debug!(target: "mx-tester-down", "Network not found for removing: {}", message);
1731 Ok(())
1732 }
1733 Err(err) => Err(err).context("Error removing network"),
1734 Ok(_) => {
1735 debug!(target: "mx-tester-down", "Network removed");
1736 Ok(())
1737 }
1738 };
1739
1740 println!("* down step: complete");
1741 script_result
1743 .and(stop_container_result)
1744 .and(remove_container_result)
1745 .and(remove_network_result)
1746}
1747
1748pub async fn run(_docker: &Docker, config: &Config) -> Result<(), Error> {
1750 println!("\n* run step: starting");
1751 if let Some(ref code) = config.run {
1752 let env = config.shared_env_variables()?;
1753 code.run("run", &config.scripts_logs_dir(), &env)
1754 .await
1755 .context("Error running `run` script")?;
1756 }
1757 println!("* run step: success");
1758 Ok(())
1759}
1760
1761#[async_trait::async_trait]
1763trait DockerExt {
1764 async fn is_network_up(&self, name: &str) -> Result<bool, Error>;
1766
1767 async fn is_container_running(&self, name: &str) -> Result<bool, Error>;
1769
1770 async fn is_container_created(&self, name: &str) -> Result<bool, Error>;
1772
1773 async fn wait_container_removed(&self, name: &str) -> Result<(), Error>;
1774}
1775
1776#[async_trait::async_trait]
1777impl DockerExt for Docker {
1778 async fn is_network_up(&self, name: &str) -> Result<bool, Error> {
1780 let networks = self
1781 .list_networks(Some(ListNetworksOptions {
1782 filters: vec![("name", vec![name])].into_iter().collect(),
1783 }))
1784 .await?;
1785 debug!("is_network_up {:#?}", networks);
1787 Ok(networks
1788 .into_iter()
1789 .filter_map(|network| network.name)
1790 .any(|candidate_name| candidate_name.as_str() == name))
1791 }
1792
1793 async fn is_container_running(&self, name: &str) -> Result<bool, Error> {
1795 let containers = self
1796 .list_containers(Some(ListContainersOptions {
1797 all: false,
1799 filters: vec![("name", vec![name])].into_iter().collect(),
1800 ..ListContainersOptions::default()
1801 }))
1802 .await?;
1803 debug!("is_container_running {:#?}", containers);
1805 let found = containers
1806 .into_iter()
1807 .flat_map(|container| container.names)
1808 .flat_map(|names| names.into_iter())
1809 .any(|container_name| container_name.as_str() == name);
1810 Ok(found)
1811 }
1812
1813 async fn is_container_created(&self, name: &str) -> Result<bool, Error> {
1815 let containers: Vec<_> = self
1816 .list_containers(Some(ListContainersOptions {
1817 all: true,
1819 filters: vec![("name", vec![name])].into_iter().collect(),
1821 ..ListContainersOptions::default()
1822 }))
1823 .await?;
1824 debug!("is_container_created {:#?}", containers);
1826 let found = containers
1827 .into_iter()
1828 .flat_map(|container| container.names)
1829 .flat_map(|names| names.into_iter())
1830 .any(|container_name| container_name.as_str() == name);
1831 Ok(found)
1832 }
1833
1834 async fn wait_container_removed(&self, name: &str) -> Result<(), Error> {
1835 let mut stream = self.wait_container(
1836 &name,
1837 Some(WaitContainerOptions {
1838 condition: "removed",
1839 }),
1840 );
1841 'waiting: while let Some(result) = stream.next().await {
1842 match result {
1843 Ok(bollard::models::ContainerWaitResponse {
1844 error: Some(error), ..
1845 }) => {
1846 return Err(anyhow!(
1847 "Error while waiting for container {} to be removed: {:?}",
1848 name,
1849 error
1850 ));
1851 }
1852 Ok(_) => {
1853 }
1855 Err(bollard::errors::Error::DockerResponseServerError { status_code, .. })
1856 if status_code == 304 =>
1857 {
1858 break 'waiting;
1860 }
1861 Err(bollard::errors::Error::DockerResponseServerError { status_code, .. })
1862 if status_code == 404 =>
1863 {
1864 break 'waiting;
1866 }
1867 Err(err) => {
1868 return Err(err)
1869 .context(format!("Waiting for container {} to be removed", name));
1870 }
1871 }
1872 }
1873 Ok(())
1874 }
1875}
1876
1877trait IsDefault {
1880 fn is_default(&self) -> bool;
1881}
1882impl IsDefault for serde_yaml::Value {
1883 fn is_default(&self) -> bool {
1884 if let Some(str) = self.as_str() {
1885 if str == "synapse-default" {
1886 return true;
1887 }
1888 }
1889 false
1890 }
1891}