1mod readiness;
2
3use crate::healthcheck::api::get_health_status;
4use anyhow::{anyhow, bail, Context, Result};
5use colored::Colorize;
6use derive_builder::Builder;
7use log::{error, info, warn};
8use nix::sys::signal::{kill, Signal};
9use nix::unistd::Pid;
10use rand::RngExt;
11use serde::Deserialize;
12use serde_yaml::{Mapping, Value};
13use std::collections::{BTreeMap, HashMap};
14use std::fs;
15use std::fs::{File, OpenOptions};
16use std::io::{BufRead, BufReader, ErrorKind, Read, Write};
17use std::net::SocketAddrV4;
18use std::os::unix::fs::symlink;
19use std::path::{Path, PathBuf};
20use std::process::{Child, Command, Stdio};
21use std::str::{self};
22use std::sync::{Arc, Mutex};
23use std::thread::{self, JoinHandle};
24use std::time::{Duration, Instant};
25
26use crate::commands::lib::instance_info::InstanceSocketClient;
27use crate::commands::lib::{
28 cargo_build, copy_directory_tree, find_active_socket_path, get_cluster_dir,
29 log_instance_skipped, log_instance_started, run_query_in_picodata_admin, spawn_picodata_admin,
30 unpack_shipping_archive,
31};
32use crate::commands::lib::{get_active_socket_path, BuildType};
33use crate::commands::lib::{is_plugin_archive, is_plugin_dir, is_plugin_shipping_dir};
34
35const BAFFLED_WHALE: &str = r"
36 __________________________________________________________
37/ Iiiiiiiiit seeeeeems Piiiiicooooodaaaaataaaaaa iiiiiiiiis \
38| noooooooot iiiiiinstaaaaaalleeeeed oooooon yyyyyooooouuur |
39\ syyyyyyysteeeeeeem. /
40 ----------------------------------------------------------
41 |
42 |
43 | .-------------'```'----....,,__ _,
44 | | `'`'`'`'-.,.__ .'(
45 | | `'--._.' )
46 | | `'-.<
47 | \ .-'`'-. -. `\
48 \ \ -.o_. _ _,-'`\ |
49 \_ ``````''--.._.-=-._ .' \ _,,--'` `-._(
50 (^^^^^^^^`___ '-. | \ __,,..--' `
51 ````````` `'--..___\ |`
52 `-.,'
53 ";
54
55const TIMEOUT_WAITING_FOR_INSTANCE_READINESS: Duration = Duration::from_secs(10);
56
57#[derive(Debug, Deserialize, Clone)]
58pub struct Tier {
59 pub replicasets: u8,
60 pub replication_factor: u8,
61}
62
63#[derive(Debug, Deserialize, Clone)]
64pub struct MigrationContextVar {
65 pub name: String,
66 pub value: String,
67}
68
69#[derive(Default, Debug, Deserialize, Clone)]
70pub struct Service {
71 pub tiers: Vec<String>,
72}
73
74#[derive(Clone, Copy, Debug, Eq, PartialEq)]
76pub enum PluginPathKind {
77 CrateOrWorkspaceDirectory,
79 ShippingDirectory,
81 ShippingArchive,
83}
84
85#[derive(Default, Debug, Deserialize, Clone)]
86pub struct Plugin {
87 #[serde(default)]
88 pub migration_context: Vec<MigrationContextVar>,
89 #[serde(default)]
90 #[serde(rename = "service")]
91 pub services: BTreeMap<String, Service>,
92 #[serde(skip)]
93 pub version: Option<String>,
94 pub path: Option<PathBuf>,
102}
103
104impl Plugin {
105 fn is_external(&self) -> bool {
106 self.path.is_some()
107 }
108}
109
110#[derive(Default, Debug, Deserialize, Clone)]
111pub struct Topology {
112 #[serde(rename = "tier")]
113 pub tiers: BTreeMap<String, Tier>,
114 #[serde(rename = "plugin")]
115 #[serde(default)]
116 pub plugins: BTreeMap<String, Plugin>,
117 #[serde(default)]
118 pub enviroment: BTreeMap<String, String>,
119 #[serde(default)]
120 pub pre_install_sql: Vec<String>,
121}
122
123impl Topology {
124 pub fn parse_toml(path: &PathBuf) -> Result<Self> {
127 let content = fs::read_to_string(path)
128 .context(format!("failed to read topology from {}", path.display()))?;
129
130 let deserializer = toml::de::Deserializer::parse(&content)
131 .context(format!("failed to parse topology from {}", path.display()))?;
132
133 serde_ignored::deserialize(deserializer, |f| {
134 warn!("Unknown field in topology TOML: {f}");
135 })
136 .map_err(anyhow::Error::from)
137 }
138
139 fn find_plugin_versions(&mut self, plugins_dir: &Path) -> Result<()> {
140 for (plugin_name, plugin) in &mut self.plugins {
141 let current_plugin_dir = plugins_dir.join(plugin_name);
142
143 if !current_plugin_dir.exists() {
144 bail!(
145 "plugin directory {} does not exist",
146 current_plugin_dir.display()
147 );
148 }
149 let mut versions: Vec<_> = fs::read_dir(current_plugin_dir)?
150 .map(|r| r.unwrap())
151 .collect();
152 versions.sort_by_key(fs::DirEntry::path);
153 let newest_version = versions
154 .last()
155 .unwrap()
156 .file_name()
157 .to_str()
158 .unwrap()
159 .to_string();
160 plugin.version = Some(newest_version);
161 }
162 Ok(())
163 }
164
165 fn has_external_plugins(&self) -> bool {
166 self.plugins.values().any(Plugin::is_external)
167 }
168}
169
170fn enable_plugins(topology: &Topology, cluster_dir: &Path, picodata_path: &Path) -> Result<()> {
171 let mut queries: Vec<String> = Vec::new();
172
173 if !topology.pre_install_sql.is_empty() {
174 info!("Executing pre-install SQL scripts...");
175 for query in &topology.pre_install_sql {
176 queries.push(query.clone());
177 }
178 }
179
180 for (plugin_name, plugin) in &topology.plugins {
181 let Some(plugin_version) = plugin.version.as_ref() else {
182 bail!("plugin version is missing for '{plugin_name}'");
183 };
184
185 queries.push(format!(
187 r#"CREATE PLUGIN "{plugin_name}" {plugin_version};"#
188 ));
189
190 for (service_name, service) in &plugin.services {
192 for tier_name in &service.tiers {
193 queries.push(format!(r#"ALTER PLUGIN "{plugin_name}" {plugin_version} ADD SERVICE "{service_name}" TO TIER "{tier_name}";"#));
194 }
195 }
196
197 for migration_env in &plugin.migration_context {
199 queries.push(format!(
200 "ALTER PLUGIN \"{plugin_name}\" {plugin_version} SET migration_context.{}='{}';",
201 migration_env.name, migration_env.value
202 ));
203 }
204
205 queries.push(format!(
207 r#"ALTER PLUGIN "{plugin_name}" MIGRATE TO {plugin_version};"#
208 ));
209
210 queries.push(format!(
212 r#"ALTER PLUGIN "{plugin_name}" {plugin_version} ENABLE;"#
213 ));
214 }
215
216 let admin_socket = cluster_dir.join("i1").join("admin.sock");
217
218 for query in queries {
219 info!("picodata admin: {query}");
220
221 let mut picodata_admin = spawn_picodata_admin(picodata_path, &admin_socket)?;
222
223 {
224 let picodata_stdin = picodata_admin.stdin.as_mut().unwrap();
225 picodata_stdin
226 .write_all(query.as_bytes())
227 .context("failed to send plugin installation queries")?;
228 }
229
230 let exit_code = picodata_admin
231 .wait()
232 .context("failed to wait for picodata admin")?
233 .code()
234 .unwrap();
235
236 let outputs: [Box<dyn Read + Send>; 2] = [
237 Box::new(picodata_admin.stdout.unwrap()),
238 Box::new(picodata_admin.stderr.unwrap()),
239 ];
240
241 let mut ignore_errors = false;
242 for output in outputs {
243 let reader = BufReader::new(output);
244 for line in reader.lines() {
245 let line = line.expect("failed to read picodata admin output");
246 info!("picodata admin: {line}");
247
248 let err_messages_to_ignore: Vec<&str> = vec!["already enabled", "already exists"];
250 for err_message in err_messages_to_ignore {
251 if line.contains(err_message) {
252 ignore_errors = true;
253 }
254 }
255 }
256 }
257
258 if exit_code == 1 && !ignore_errors {
259 bail!("failed to execute picodata query {query}");
260 }
261 }
262
263 for (plugin_name, plugin) in &topology.plugins {
264 info!(
265 "Plugin {plugin_name}:{} has been enabled",
266 plugin.version.as_ref().unwrap()
267 );
268 }
269
270 Ok(())
271}
272
273fn get_ipv4_from_template_var(
274 env_vars: &BTreeMap<String, String>,
275 variable: &str,
276) -> Option<SocketAddrV4> {
277 let env_ipv4 = env_vars.get(variable)?;
278 let env_ipv4 = env_ipv4.parse::<SocketAddrV4>().unwrap_or_else(|e| {
279 panic!("could not parse {variable} to an ipv4 address: {e}. hint: use 127.0.0.1")
280 });
281 let ip = env_ipv4.ip();
282 assert!(
283 ip.is_loopback() || ip.is_unspecified(),
284 "ipv4 address {env_ipv4:?} of variable {variable} \
285 is not loopback (127.0.0.1) or unspecified (0.0.0.0), \
286 so it can't be used in pike."
287 );
288 Some(env_ipv4)
289}
290
291fn get_picodata_version(picodata_path: &PathBuf) -> Result<String> {
292 let picodata_output = Command::new(picodata_path).arg("--version").output();
293
294 let picodata_output = match picodata_output {
295 Ok(o) => o,
296 Err(err) if err.kind() == ErrorKind::NotFound => {
297 println!("{BAFFLED_WHALE}");
298 bail!("Picodata not found")
299 }
300 Err(err) => bail!("failed to get picodata version ({err})"),
301 };
302
303 Ok(str::from_utf8(&picodata_output.stdout)?.to_string())
304}
305
306#[allow(dead_code)]
307pub struct PicodataInstanceProperties<'a> {
308 pub bin_port: &'a u16,
309 pub pg_port: &'a u16,
310 pub http_port: &'a u16,
311 pub data_dir: &'a Path,
312 pub instance_name: &'a str,
313 pub tier: &'a str,
314 pub instance_id: &'a u16,
315}
316
317#[derive(Debug)]
318pub struct PicodataInstance {
319 instance_name: String,
320 instance_id: u16,
321 tier: String,
322 log_threads: Option<Vec<JoinHandle<()>>>,
323 child: Child,
324 daemon: bool,
325 disable_colors: bool,
326 data_dir: PathBuf,
327 log_file_path: PathBuf,
328 pg_port: u16,
329 bin_port: u16,
330 http_port: u16,
331}
332
333impl PicodataInstance {
334 #[allow(clippy::too_many_lines)]
335 fn new(
336 instance_id: u16,
337 plugins_dir: Option<&PathBuf>,
338 tier: &str,
339 run_params: &Params,
340 ) -> Result<Self> {
341 let mut instance_name = Self::make_name(instance_id);
343 let tiers_config = get_merged_cluster_tier_config(
344 &run_params.plugin_path,
345 &run_params.config_path,
346 &run_params.topology.tiers,
347 )?;
348
349 let cluster_dir = get_cluster_dir(&run_params.plugin_path, &run_params.data_dir);
351 let instance_data_dir = cluster_dir.join(&instance_name);
352 let log_file_path = instance_data_dir.join("picodata.log");
353 let audit_file_path = instance_data_dir.join("audit.log");
354
355 fs::create_dir_all(&instance_data_dir).context("Failed to create instance data dir")?;
356
357 let mut template_env = minijinja::Environment::new();
358
359 for x in &run_params.topology.enviroment {
360 template_env.add_template(x.0.as_str(), x.1.as_str())?;
361 }
362
363 let env_templates_ctx = minijinja::context! {
364 instance_id => instance_id,
365 };
366 let env_vars: BTreeMap<String, String> =
367 Self::compute_env_vars(&template_env, &env_templates_ctx)?;
368
369 let first_env_vars: BTreeMap<String, String> = Self::compute_env_vars(
370 &template_env,
371 &minijinja::context! {
372 instance_id => 1,
373 },
374 )?;
375
376 let mut child = Command::new(&run_params.picodata_path);
377 child.envs(&env_vars);
378
379 let first_instance_bin_ipv4 =
380 get_ipv4_from_template_var(&first_env_vars, "PICODATA_IPROTO_LISTEN")
381 .unwrap_or(format!("127.0.0.1:{}", run_params.base_bin_port + 1).parse()?);
382 let bin_ipv4 = get_ipv4_from_template_var(&env_vars, "PICODATA_IPROTO_LISTEN")
383 .unwrap_or(format!("127.0.0.1:{}", run_params.base_bin_port + instance_id).parse()?);
384 let http_ipv4 = get_ipv4_from_template_var(&env_vars, "PICODATA_HTTP_LISTEN")
385 .unwrap_or(format!("0.0.0.0:{}", run_params.base_http_port + instance_id).parse()?);
386 let pg_ipv4 = get_ipv4_from_template_var(&env_vars, "PICODATA_PG_LISTEN")
387 .unwrap_or(format!("127.0.0.1:{}", run_params.base_pg_port + instance_id).parse()?);
388
389 child.args([
390 "run",
391 "--instance-dir",
392 instance_data_dir.to_str().expect("unreachable"),
393 "--iproto-listen",
394 &bin_ipv4.to_string(),
395 "--peer",
396 &first_instance_bin_ipv4.to_string(),
397 "--http-listen",
398 &http_ipv4.to_string(),
399 "--pg-listen",
400 &pg_ipv4.to_string(),
401 "--tier",
402 tier,
403 "--config-parameter",
404 &format!("cluster.tier={tiers_config}",),
405 ]);
406
407 let config_path = run_params.plugin_path.join(&run_params.config_path);
408 if config_path.exists() {
409 let instance_config_path =
411 Self::template_config(&config_path, &instance_data_dir, &env_templates_ctx)?;
412 child.args([
413 "--config",
414 instance_config_path.to_str().unwrap_or("./picodata.yaml"),
415 ]);
416 } else {
417 warn!(
418 "couldn't locate picodata config at {} - skipping.",
419 config_path.to_str().unwrap()
420 );
421 }
422
423 if let Some(plugins_dir) = plugins_dir {
424 child.args([
425 "--share-dir",
426 plugins_dir.to_str().unwrap_or("target/debug"),
427 ]);
428 }
429
430 if run_params.daemon {
431 child.stdout(Stdio::null()).stderr(Stdio::null());
432 child.args(["--log", log_file_path.to_str().expect("unreachable")]);
433 } else {
434 child.stdout(Stdio::piped()).stderr(Stdio::piped());
435 }
436
437 if run_params.with_audit {
438 child.args(["--audit", audit_file_path.to_str().expect("unreachable")]);
439 }
440
441 let child = child
442 .spawn()
443 .context(format!("failed to start picodata instance: {instance_id}"))?;
444
445 let start = Instant::now();
446 while Instant::now().duration_since(start) < TIMEOUT_WAITING_FOR_INSTANCE_READINESS {
447 thread::sleep(Duration::from_millis(100));
448 let socket_client =
449 InstanceSocketClient::new(&instance_data_dir, &run_params.picodata_path);
450 let Ok(new_instance_name) = socket_client
451 .instance_name()
452 .inspect_err(|err| log::debug!("failed to get name of the instance: {err}"))
453 else {
454 continue;
455 };
456
457 let instance_current_state = socket_client.current_state()?;
460 if !instance_current_state.is_online() {
461 info!("Waiting for '{new_instance_name}' to become 'Online'");
462 continue;
463 }
464
465 let symlink_name = cluster_dir.join(&new_instance_name);
467 let _ = fs::remove_file(&symlink_name);
468 symlink(&instance_name, symlink_name)
469 .context("failed create symlink to instance dir")?;
470
471 instance_name = new_instance_name;
472 break;
473 }
474
475 let mut pico_instance = PicodataInstance {
476 instance_name,
477 tier: tier.to_string(),
478 log_threads: None,
479 child,
480 daemon: run_params.daemon,
481 disable_colors: run_params.disable_colors,
482 data_dir: instance_data_dir,
483 log_file_path,
484 pg_port: pg_ipv4.port(),
485 bin_port: bin_ipv4.port(),
486 http_port: http_ipv4.port(),
487 instance_id,
488 };
489
490 if !run_params.daemon {
491 pico_instance.capture_logs()?;
492 }
493
494 pico_instance.make_pid_file()?;
496
497 Ok(pico_instance)
498 }
499
500 pub(crate) fn http_port(&self) -> u16 {
501 self.http_port
502 }
503
504 pub(crate) fn socket_client<'a>(&self, picodata_path: &'a PathBuf) -> InstanceSocketClient<'a> {
505 InstanceSocketClient::new(&self.data_dir, picodata_path)
506 }
507
508 #[allow(dead_code)]
509 #[allow(clippy::must_use_candidate)]
510 pub fn properties(&self) -> PicodataInstanceProperties<'_> {
511 PicodataInstanceProperties {
512 bin_port: &self.bin_port,
513 pg_port: &self.pg_port,
514 http_port: &self.http_port,
515 data_dir: &self.data_dir,
516 instance_name: &self.instance_name,
517 tier: &self.tier,
518 instance_id: &self.instance_id,
519 }
520 }
521
522 fn compute_env_vars(
523 template_env: &minijinja::Environment,
524 ctx: &minijinja::Value,
525 ) -> Result<BTreeMap<String, String>> {
526 let mut result = BTreeMap::new();
527
528 for (name, template) in template_env.templates() {
529 result.insert(name.into(), template.render(ctx)?);
530 }
531
532 Ok(result)
533 }
534
535 fn template_config(
538 config_path: &Path,
539 instance_data_dir: &Path,
540 ctx: &minijinja::Value,
541 ) -> Result<PathBuf> {
542 let config_content = fs::read_to_string(config_path).with_context(|| {
543 format!(
544 "failed to read picodata config at {}",
545 config_path.display()
546 )
547 })?;
548
549 let mut parser = minijinja::Environment::new();
550
551 parser.add_template("config", &config_content)?;
552
553 let template = parser.get_template("config")?;
554
555 let rendered = template.render(ctx).with_context(|| {
556 format!(
557 "failed to render picodata config template at {}",
558 config_path.display()
559 )
560 })?;
561
562 let instance_config_path = instance_data_dir.join("picodata.yaml");
563 fs::write(&instance_config_path, &rendered).with_context(|| {
564 format!(
565 "failed to write templated picodata config to {}",
566 instance_config_path.display()
567 )
568 })?;
569
570 Ok(instance_config_path)
571 }
572
573 fn capture_logs(&mut self) -> Result<()> {
574 let mut rnd = rand::rng();
575 let instance_name_color = colored::CustomColor::new(
576 rnd.random_range(30..220),
577 rnd.random_range(30..220),
578 rnd.random_range(30..220),
579 );
580
581 let file = OpenOptions::new()
582 .create(true)
583 .truncate(true)
584 .write(true)
585 .open(&self.log_file_path)
586 .expect("Failed to open log file");
587 let file = Arc::new(Mutex::new(file));
588
589 let mut log_threads = vec![];
590
591 let stdout = self.child.stdout.take().expect("Failed to capture stdout");
592 let stderr = self.child.stderr.take().expect("Failed to capture stderr");
593 let outputs: [Box<dyn Read + Send>; 2] = [Box::new(stdout), Box::new(stderr)];
594 for child_output in outputs {
595 let mut log_prefix = format!("{}: ", self.instance_name);
596 if !self.disable_colors {
597 log_prefix = log_prefix.custom_color(instance_name_color).to_string();
598 }
599 let file = file.clone();
600
601 let wrapper = move || {
602 let stdout_lines = BufReader::new(child_output).lines();
603 for line in stdout_lines {
604 let line = line.unwrap();
605 println!("{log_prefix}{line}");
606 writeln!(file.lock().unwrap(), "{line}")
607 .expect("Failed to write line to log file");
608 }
609 };
610
611 let t = thread::Builder::new()
612 .name(format!("log_catcher::{}", self.instance_name))
613 .spawn(wrapper)?;
614
615 log_threads.push(t);
616 }
617
618 self.log_threads = Some(log_threads);
619
620 Ok(())
621 }
622
623 fn make_pid_file(&self) -> Result<()> {
624 let pid = self.child.id();
625 let pid_location = self.data_dir.join("pid");
626 let mut file = File::create(pid_location)?;
627 writeln!(file, "{pid}")?;
628 Ok(())
629 }
630
631 fn make_name(id: u16) -> String {
632 format!("i{id}")
633 }
634
635 fn kill(&mut self) -> Result<()> {
636 Ok(self.child.kill()?)
637 }
638
639 fn join(&mut self) {
640 let Some(threads) = self.log_threads.take() else {
641 return;
642 };
643 for h in threads {
644 h.join()
645 .expect("Failed to join thread for picodata instance");
646 }
647 }
648}
649
650impl Drop for PicodataInstance {
651 fn drop(&mut self) {
652 if self.daemon {
653 return;
654 }
655
656 self.child
657 .wait()
658 .expect("Failed to wait for picodata instance");
659 }
660}
661
662fn get_merged_cluster_tier_config(
663 plugin_path: &Path,
664 config_path: &Path,
665 tiers: &BTreeMap<String, Tier>,
666) -> Result<String> {
667 let picodata_conf_path = plugin_path.join(config_path);
668
669 let maybe_raw = fs::read_to_string(&picodata_conf_path);
671 let root_value: Value = match maybe_raw {
672 Err(e) if e.kind() == ErrorKind::NotFound => Value::Mapping(Mapping::new()),
673 Err(e) => {
674 return Err(e).with_context(|| {
675 format!(
676 "failed to read picodata config at {}",
677 picodata_conf_path.display()
678 )
679 })
680 }
681 Ok(raw) => {
682 if raw.trim().is_empty() {
684 Value::Mapping(Mapping::new())
685 } else {
686 serde_yaml::from_str::<Value>(&raw).with_context(|| {
687 format!(
688 "invalid YAML in picodata config at {}",
689 picodata_conf_path.display()
690 )
691 })?
692 }
693 }
694 };
695
696 let root_map = match root_value {
697 Value::Mapping(m) => m,
698 other => {
699 bail!(
700 "invalid root in picodata config at {}: expected YAML mapping object, got {:?}",
701 picodata_conf_path.display(),
702 other
703 );
704 }
705 };
706
707 let cluster_params = root_map
708 .get("cluster")
709 .and_then(Value::as_mapping)
710 .cloned()
711 .unwrap_or_else(Mapping::new);
712
713 let mut tier_params = cluster_params
714 .get("tier")
715 .and_then(Value::as_mapping)
716 .cloned()
717 .unwrap_or_else(Mapping::new);
718
719 for value in tier_params.values_mut() {
720 if value.is_null() {
721 *value = Value::Mapping(Mapping::new());
722 }
723 }
724
725 for (tier_name, tier_value) in tiers {
726 tier_params
727 .entry(Value::String(tier_name.clone()))
728 .and_modify(|entry| {
729 if let Value::Mapping(ref mut map) = entry {
730 map.insert(
731 Value::String("replication_factor".into()),
732 Value::Number(tier_value.replication_factor.into()),
733 );
734 }
735 })
736 .or_insert_with(|| {
737 let mut map = Mapping::new();
738 map.insert(
739 Value::String("replication_factor".into()),
740 Value::Number(tier_value.replication_factor.into()),
741 );
742 Value::Mapping(map)
743 });
744 }
745
746 serde_json::to_string(&tier_params).context("failed to serialize cluster.tier to JSON")
747}
748
749fn get_external_plugin_path_kind(path: &Path) -> Result<PluginPathKind> {
750 if path
752 .symlink_metadata()
753 .map(|m| m.file_type().is_symlink())
754 .unwrap_or(false)
755 {
756 bail!(
757 "symlink as external plugin path is not supported: {}",
758 path.display()
759 );
760 }
761
762 let meta = path.metadata().with_context(|| {
763 format!(
764 "failed to query external plugin path metadata at {}",
765 path.display()
766 )
767 })?;
768
769 if meta.is_file() {
770 return match is_plugin_archive(path) {
771 Ok(()) => Ok(PluginPathKind::ShippingArchive),
772 Err(error) => Err(error.context("external plugin path is an unknown file")),
773 };
774 }
775 if meta.is_dir() {
776 if is_plugin_dir(path) {
777 return Ok(PluginPathKind::CrateOrWorkspaceDirectory);
778 }
779 return match is_plugin_shipping_dir(path) {
780 Ok(()) => Ok(PluginPathKind::ShippingDirectory),
781 Err(error) => {
782 Err(error.context("external plugin path directory has invalid structure"))
783 }
784 };
785 }
786 bail!("unknown external plugin path type: '{}'", path.display());
788}
789
790fn materialize_external_plugin(
792 name: &str,
793 kind: PluginPathKind,
794 path: &PathBuf,
795 params: &Params,
796 plugin_run_dir: &Path,
797) -> Result<()> {
798 match kind {
799 PluginPathKind::ShippingArchive => {
800 unpack_shipping_archive(path, plugin_run_dir).with_context(|| {
801 format!(
802 "failed to unpack shipping archive for plugin '{}' from '{}'",
803 name,
804 path.display()
805 )
806 })?;
807 }
808 PluginPathKind::ShippingDirectory => {
809 copy_directory_tree(path, plugin_run_dir).with_context(|| {
810 format!(
811 "failed to copy shipping directory for plugin '{}' from '{}'",
812 name,
813 path.display()
814 )
815 })?;
816 }
817 PluginPathKind::CrateOrWorkspaceDirectory => {
818 let (profile, target_dir) = (params.get_build_profile(), ¶ms.target_dir);
819 if !params.no_build {
820 cargo_build(profile, target_dir, path).with_context(|| {
821 format!(
822 "failed to build external cargo plugin '{}' at '{}'",
823 name,
824 path.display()
825 )
826 })?;
827 }
828 let src_shipping_dir = path.join(target_dir).join(profile.to_string()).join(name);
829 copy_directory_tree(&src_shipping_dir, plugin_run_dir).with_context(|| {
830 format!(
831 "failed to copy built plugin '{}' from '{}' (profile {})",
832 name,
833 src_shipping_dir.display(),
834 profile
835 )
836 })?;
837 }
838 }
839 Ok(())
840}
841
842fn prepare_external_plugins(params: &Params, plugin_run_dir: &Path) -> Result<()> {
847 if !params.topology.has_external_plugins() {
848 return Ok(());
849 }
850
851 let external_plugins: Vec<_> = params
852 .topology
853 .plugins
854 .iter()
855 .filter(|(_, p)| p.is_external())
856 .collect();
857
858 info!(
859 "Found {} external plugins, loading into {}",
860 external_plugins.len(),
861 plugin_run_dir.display()
862 );
863
864 let mut path_info: HashMap<&str, (PluginPathKind, PathBuf)> =
865 HashMap::with_capacity(external_plugins.len());
866
867 for (name, plugin) in external_plugins {
868 let path = plugin
869 .path
870 .as_ref()
871 .ok_or_else(|| anyhow!("external plugin '{name}' has no path"))?;
872 let kind = get_external_plugin_path_kind(path).with_context(|| {
873 format!(
874 "failed to validate external path '{}' for plugin {}",
875 path.display(),
876 name
877 )
878 })?;
879 path_info.insert(name.as_str(), (kind, path.clone()));
880 }
881
882 for (name, (kind, path)) in &path_info {
883 materialize_external_plugin(name, *kind, path, params, plugin_run_dir)?;
884 }
885 Ok(())
886}
887
888#[allow(clippy::struct_excessive_bools)]
889#[derive(Debug, Builder, Clone)]
890pub struct Params {
891 topology: Topology,
892 #[builder(default = "PathBuf::from(\"./tmp\")")]
893 data_dir: PathBuf,
894 #[builder(default = "false")]
895 disable_plugin_install: bool,
896 #[builder(default = "3000")]
897 base_bin_port: u16,
898 #[builder(default = "8000")]
899 base_http_port: u16,
900 #[builder(default = "PathBuf::from(\"picodata\")")]
901 picodata_path: PathBuf,
902 #[builder(default = "5432")]
903 base_pg_port: u16,
904 #[builder(default = "false")]
905 use_release: bool,
906 #[builder(default = "PathBuf::from(\"target\")")]
907 target_dir: PathBuf,
908 #[builder(default = "false")]
909 daemon: bool,
910 #[builder(default = "false")]
911 disable_colors: bool,
912 #[builder(default = "PathBuf::from(\"./\")")]
913 plugin_path: PathBuf,
914 #[builder(default = "false")]
915 no_build: bool,
916 #[builder(default = "PathBuf::from(\"./picodata.yaml\")")]
917 config_path: PathBuf,
918 #[builder(default)]
919 instance_name: Option<String>,
920 #[builder(default = "false")]
921 with_web_auth: bool,
922 #[builder(default = "false")]
923 with_audit: bool,
924 #[builder(default = "false")]
925 wait_vshard_discovery: bool,
926 #[builder(default = "300")]
927 wait_vshard_discovery_timeout: u64,
928}
929
930impl Params {
931 pub fn get_build_profile(&self) -> BuildType {
932 if self.use_release {
933 BuildType::Release
934 } else {
935 BuildType::Debug
936 }
937 }
938
939 pub fn get_plugins_dir(&self) -> PathBuf {
940 let build_profile = self.get_build_profile();
941 self.plugin_path
942 .join(self.target_dir.join(build_profile.to_string()))
943 }
944
945 pub fn get_cluster_dir(&self) -> PathBuf {
946 get_cluster_dir(&self.plugin_path, &self.data_dir)
947 }
948}
949
950fn configure_web_auth<F>(
951 picodata_path: &Path,
952 socket_path: &Path,
953 with_web_auth: bool,
954 run_admin: F,
955) -> Result<()>
956where
957 F: Fn(&Path, &Path, &str) -> Result<String>,
958{
959 if with_web_auth {
960 run_admin(picodata_path, socket_path, "ALTER SYSTEM RESET jwt_secret;")
961 .context("failed to enable WebUI authentication (RESET jwt_secret)")?;
962 info!("WebUI auth: включена (RESET jwt_secret).");
963 } else {
964 run_admin(
965 picodata_path,
966 socket_path,
967 "ALTER SYSTEM SET jwt_secret = '';",
968 )
969 .context("failed to disable WebUI authentication (SET jwt_secret='')")?;
970 info!("WebUI auth: отключена (jwt_secret='').");
971 }
972 Ok(())
973}
974
975fn apply_web_auth_setting(params: &Params, cluster_dir: &Path) -> Result<()> {
977 let Some(socket_path) = find_active_socket_path(cluster_dir)? else {
978 bail!("не удалось найти активный admin.sock для применения настройки WebUI auth");
979 };
980
981 let run_admin = |p: &Path, s: &Path, q: &str| run_query_in_picodata_admin(p, s, q);
982
983 if let Err(err) = configure_web_auth(
984 ¶ms.picodata_path,
985 &socket_path,
986 params.with_web_auth,
987 run_admin,
988 ) {
989 warn!("Не удалось применить WebUI auth настройку: {err:#}");
990 }
991
992 Ok(())
993}
994
995fn run_single_instance(
996 params: &Params,
997 plugins_dir: Option<&PathBuf>,
998) -> anyhow::Result<Vec<PicodataInstance>> {
999 let instance_name = params.instance_name.as_ref().unwrap().clone();
1000 let cluster_dir = params.get_cluster_dir();
1001
1002 if get_active_socket_path(&cluster_dir, &instance_name).is_some() {
1004 log_instance_skipped(instance_name);
1005 return Ok(vec![]);
1006 }
1007
1008 let dirs = fs::read_dir(&cluster_dir).context(format!(
1009 "cluster data dir with path {} does not exist",
1010 cluster_dir.to_string_lossy()
1011 ))?;
1012
1013 info!(
1014 "running picodata cluster instance '{instance_name}', data folder: {}",
1015 cluster_dir.join(&instance_name).to_string_lossy()
1016 );
1017
1018 let mut instance_dir = dirs
1020 .into_iter()
1021 .find_map(|result| {
1022 let dir_entry = result.ok()?;
1023 let filename = dir_entry.file_name();
1024
1025 let Some(dir_name) = filename.to_str() else {
1026 return None;
1028 };
1029
1030 if dir_name != instance_name {
1031 return None;
1032 }
1033
1034 Some(dir_entry.path())
1035 })
1036 .ok_or({
1037 anyhow::anyhow!("failed to locate directory of the instance '{instance_name}")
1038 })?;
1039
1040 if instance_dir.is_symlink() {
1041 instance_dir = fs::read_link(instance_dir)?;
1042 }
1043 let pico_instance_name = instance_dir
1044 .file_name()
1045 .expect("unreachable: canonicalized path cannot have .. as filename")
1046 .to_str();
1047 let instance_id = pico_instance_name
1048 .expect("unreachable: instance path should be convertible to str")[1..]
1049 .parse::<u16>()?;
1050
1051 let mut instance_id_counter = 0;
1052 let mut instance_tier_name = &String::new();
1053 for (tier_name, tier) in ¶ms.topology.tiers {
1054 instance_id_counter += u16::from(tier.replicasets * tier.replication_factor);
1055 if instance_id <= instance_id_counter {
1056 instance_tier_name = tier_name;
1057 break;
1058 }
1059 }
1060
1061 let pico_instance =
1062 PicodataInstance::new(instance_id, plugins_dir, instance_tier_name, params)?;
1063
1064 log_instance_started(instance_name);
1065
1066 apply_web_auth_setting(params, &cluster_dir)?;
1067
1068 Ok(vec![pico_instance])
1069}
1070
1071fn run_cluster(params: &Params, plugins_dir: Option<&PathBuf>) -> Result<Vec<PicodataInstance>> {
1072 assert!(params.instance_name.is_none(), "invariant");
1073
1074 let cluster_dir = params.get_cluster_dir();
1075 let picodata_version = get_picodata_version(¶ms.picodata_path)?;
1076
1077 info!("Running the cluster with {picodata_version}...");
1078 let start_cluster_run = Instant::now();
1079 let mut picodata_processes = start_instances_in_tiers(params, plugins_dir)?;
1080
1081 readiness::wait_instances_ready(&picodata_processes)?;
1082 apply_web_auth_setting(params, &cluster_dir)?;
1083
1084 if params.wait_vshard_discovery {
1085 readiness::wait_vshard_discovery(&picodata_processes, params)?;
1086 }
1087
1088 if !params.disable_plugin_install && !params.topology.plugins.is_empty() {
1089 if plugins_dir.is_none() {
1090 bail!("failed to enable plugins: directory with plugins is missing.")
1091 }
1092 info!("Enabling plugins...");
1093 let result = enable_plugins(¶ms.topology, &cluster_dir, ¶ms.picodata_path);
1094 if let Err(e) = result {
1095 for process in &mut picodata_processes {
1096 process.kill().unwrap_or_else(|e| {
1097 error!("failed to kill picodata instances: {e:#}");
1098 });
1099 }
1100 bail!("failed to enable plugins: {e}");
1101 }
1102 }
1103
1104 info!(
1105 "Picodata cluster has started (launch time: {} sec, total instances: {})",
1106 start_cluster_run.elapsed().as_secs(),
1107 picodata_processes.len()
1108 );
1109
1110 Ok(picodata_processes)
1111}
1112
1113fn start_instances_in_tiers(
1124 params: &Params,
1125 plugins_dir: Option<&PathBuf>,
1126) -> anyhow::Result<Vec<PicodataInstance>> {
1127 let cluster_dir = params.get_cluster_dir();
1128 let mut picodata_processes = vec![];
1129 let mut instance_id = 0;
1130
1131 for (tier_name, tier) in ¶ms.topology.tiers {
1132 info!("Starting instances in tier '{tier_name}' ...");
1133 for _ in 0..(tier.replicasets * tier.replication_factor) {
1134 instance_id += 1;
1135 let instance_name = PicodataInstance::make_name(instance_id);
1136
1137 if get_active_socket_path(&cluster_dir, &instance_name).is_some() {
1138 log_instance_skipped(instance_name);
1139 continue;
1140 }
1141
1142 let pico_instance = PicodataInstance::new(instance_id, plugins_dir, tier_name, params)?;
1143 picodata_processes.push(pico_instance);
1144
1145 log_instance_started(instance_name);
1146 }
1147 }
1148
1149 Ok(picodata_processes)
1150}
1151
1152fn prepare_directory_with_plugins(params: &mut Params) -> anyhow::Result<Option<PathBuf>> {
1153 if is_plugin_dir(¶ms.plugin_path) {
1154 let plugins_dir = params.get_plugins_dir();
1155 let build_profile = params.get_build_profile();
1156
1157 if !params.no_build {
1158 cargo_build(build_profile, ¶ms.target_dir, ¶ms.plugin_path)?;
1159 }
1160
1161 prepare_external_plugins(params, &plugins_dir)?;
1162 params.topology.find_plugin_versions(&plugins_dir)?;
1163 return Ok(Some(plugins_dir));
1164 }
1165
1166 if params.topology.has_external_plugins() {
1167 let plugins_dir = params.get_cluster_dir().join("plugins");
1170
1171 fs::create_dir_all(&plugins_dir).with_context(|| {
1172 format!(
1173 "failed to create directory for external plugins at {}",
1174 plugins_dir.display()
1175 )
1176 })?;
1177
1178 prepare_external_plugins(params, &plugins_dir)?;
1179 params.topology.find_plugin_versions(&plugins_dir)?;
1180 return Ok(Some(plugins_dir));
1181 }
1182
1183 if !params.topology.plugins.is_empty() {
1184 bail!(
1185 "failed to prepare plugins: plugin directory is unknown.\n\
1186 If you use external plugins, ensure they are prepared or run from a pike plugin project"
1187 )
1188 }
1189
1190 Ok(None)
1191}
1192
1193pub fn cluster(mut params: Params) -> Result<Vec<PicodataInstance>> {
1194 let plugins_dir = prepare_directory_with_plugins(&mut params)?;
1195
1196 if params.instance_name.is_some() {
1197 info!("Starting single cluster instance");
1198 return run_single_instance(¶ms, plugins_dir.as_ref());
1199 }
1200
1201 if let Some(sock_path) = find_active_socket_path(¶ms.get_cluster_dir())? {
1202 info!("Cluster is running (connected via {})", sock_path.display());
1203 return start_instances_in_tiers(¶ms, plugins_dir.as_ref());
1205 }
1206
1207 run_cluster(¶ms, plugins_dir.as_ref())
1208}
1209
1210fn print_webui_url(pico_instances: &[PicodataInstance]) {
1211 let Some(first) = pico_instances.first() else {
1212 return;
1213 };
1214
1215 let leader_name = get_health_status(first)
1219 .ok()
1220 .map(|s| s.raft.leader_name)
1221 .unwrap_or_default();
1222 let leader_instance = pico_instances
1224 .iter()
1225 .find(|i| !leader_name.is_empty() && i.properties().instance_name == leader_name);
1226 let port = leader_instance.unwrap_or(first).http_port();
1227
1228 let url = format!("http://localhost:{port}").bold();
1229 println!("\nCluster is running. To open Web UI, visit:\n {url}\n");
1230}
1231
1232pub fn cmd(params: Params) -> Result<()> {
1233 let is_daemon_mode = params.daemon;
1234 let mut pico_instances = cluster(params)?;
1235
1236 print_webui_url(&pico_instances);
1237
1238 if is_daemon_mode {
1239 return Ok(());
1240 }
1241
1242 let picodata_pids: Vec<u32> = pico_instances.iter().map(|p| p.child.id()).collect();
1246 ctrlc::set_handler(move || {
1247 info!("received Ctrl+C. Shutting down ...");
1248
1249 for &pid in &picodata_pids {
1250 let _ = kill(Pid::from_raw(pid.cast_signed()), Signal::SIGKILL);
1251 }
1252 })
1253 .context("failed to set Ctrl+c handler")?;
1254
1255 for instance in &mut pico_instances {
1257 instance.join();
1258 }
1259
1260 Ok(())
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265 use super::*;
1266 use crate::commands::lib::LIB_EXT;
1267 use flate2::write::GzEncoder;
1268 use flate2::Compression;
1269 use std::cell::RefCell;
1270 use std::time::{SystemTime, UNIX_EPOCH};
1271 use tar::Builder;
1272
1273 fn tmp_dir(prefix: &str) -> PathBuf {
1274 let ts = SystemTime::now()
1275 .duration_since(UNIX_EPOCH)
1276 .unwrap()
1277 .as_nanos();
1278 let mut dir = std::env::temp_dir();
1279 dir.push(format!("pike-run-ut-{prefix}-{ts}"));
1280 dir
1281 }
1282
1283 fn capture_runner(
1284 captured: &RefCell<Vec<String>>,
1285 ) -> impl Fn(&Path, &Path, &str) -> Result<String> + '_ {
1286 move |_: &Path, _: &Path, q: &str| -> Result<String> {
1287 captured.borrow_mut().push(q.to_string());
1288 Ok(String::new())
1289 }
1290 }
1291
1292 #[test]
1293 fn web_auth_config_enables_with_reset() {
1294 let picodata = Path::new("picodata");
1295 let sock = Path::new("/tmp/admin.sock");
1296 let captured: RefCell<Vec<String>> = RefCell::new(vec![]);
1297
1298 configure_web_auth(picodata, sock, true, capture_runner(&captured)).unwrap();
1299
1300 let calls = captured.borrow();
1301 assert_eq!(calls.len(), 1);
1302 assert!(
1303 calls[0].contains("ALTER SYSTEM RESET jwt_secret;"),
1304 "expected RESET query, got: {}",
1305 calls[0]
1306 );
1307 }
1308
1309 #[test]
1310 fn web_auth_config_clears_secret_when_disabled() {
1311 let picodata = Path::new("picodata");
1312 let sock = Path::new("/tmp/admin.sock");
1313 let captured: RefCell<Vec<String>> = RefCell::new(vec![]);
1314
1315 configure_web_auth(picodata, sock, false, capture_runner(&captured)).unwrap();
1316
1317 let calls = captured.borrow();
1318 assert_eq!(calls.len(), 1);
1319 assert!(
1320 calls[0].contains("ALTER SYSTEM SET jwt_secret = '';"),
1321 "expected query to clear secret, got: {}",
1322 calls[0]
1323 );
1324 }
1325
1326 fn temp_dir_unique(prefix: &str) -> PathBuf {
1327 let nanos = SystemTime::now()
1328 .duration_since(UNIX_EPOCH)
1329 .unwrap()
1330 .as_nanos();
1331 let dir = std::env::temp_dir().join(format!("{prefix}_{nanos}"));
1332 fs::create_dir_all(&dir).unwrap();
1333 dir
1334 }
1335
1336 #[test]
1337 fn merged_cluster_tier_config_ok_when_config_missing() {
1338 let plugin_dir = temp_dir_unique("pike_test_missing");
1339 let config_path = PathBuf::from("picodata.yaml");
1341
1342 let mut tiers = BTreeMap::new();
1343 tiers.insert(
1344 "t1".to_string(),
1345 Tier {
1346 replicasets: 1,
1347 replication_factor: 2,
1348 },
1349 );
1350
1351 let json = get_merged_cluster_tier_config(&plugin_dir, &config_path, &tiers).unwrap();
1352 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1353 assert!(v.get("t1").is_some());
1354 assert_eq!(
1355 v["t1"]["replication_factor"].as_i64().unwrap(),
1356 2,
1357 "replication_factor from topology must be applied"
1358 );
1359 }
1360
1361 #[test]
1362 fn merged_cluster_tier_config_ok_when_config_empty() {
1363 let plugin_dir = temp_dir_unique("pike_test_empty");
1364 let config_path = PathBuf::from("picodata.yaml");
1365 let config_file = plugin_dir.join(&config_path);
1366 fs::write(&config_file, "").unwrap();
1367
1368 let mut tiers = BTreeMap::new();
1369 tiers.insert(
1370 "t1".to_string(),
1371 Tier {
1372 replicasets: 1,
1373 replication_factor: 3,
1374 },
1375 );
1376
1377 let json = get_merged_cluster_tier_config(&plugin_dir, &config_path, &tiers).unwrap();
1378 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1379 assert_eq!(v["t1"]["replication_factor"].as_i64().unwrap(), 3);
1380 }
1381
1382 #[test]
1383 fn merged_cluster_tier_config_errors_on_invalid_yaml() {
1384 let plugin_dir = temp_dir_unique("pike_test_invalid");
1385 let config_path = PathBuf::from("picodata.yaml");
1386 let config_file = plugin_dir.join(&config_path);
1387 fs::write(&config_file, "cluster: [\n - broken").unwrap();
1389
1390 let mut tiers = BTreeMap::new();
1391 tiers.insert(
1392 "t1".to_string(),
1393 Tier {
1394 replicasets: 1,
1395 replication_factor: 1,
1396 },
1397 );
1398
1399 let res = get_merged_cluster_tier_config(&plugin_dir, &config_path, &tiers);
1400 assert!(res.is_err(), "invalid YAML must return error");
1401 }
1402
1403 #[test]
1404 fn merged_cluster_tier_config_preserves_valid_config_and_overrides_replication_factor() {
1405 let plugin_dir = temp_dir_unique("pike_test_valid");
1406 let config_path = PathBuf::from("picodata.yaml");
1407 let config_file = plugin_dir.join(&config_path);
1408 let yaml = r#"
1409 cluster:
1410 tier:
1411 t1: null
1412 t2:
1413 replication_factor: 99
1414 custom: "keep"
1415 "#;
1416 fs::write(&config_file, yaml).unwrap();
1417
1418 let mut tiers = BTreeMap::new();
1419 tiers.insert(
1420 "t1".to_string(),
1421 Tier {
1422 replicasets: 1,
1423 replication_factor: 5,
1424 },
1425 );
1426 tiers.insert(
1427 "t2".to_string(),
1428 Tier {
1429 replicasets: 1,
1430 replication_factor: 7,
1431 },
1432 );
1433
1434 let json = get_merged_cluster_tier_config(&plugin_dir, &config_path, &tiers).unwrap();
1435 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1436
1437 assert_eq!(v["t1"]["replication_factor"].as_i64().unwrap(), 5);
1439
1440 assert_eq!(v["t2"]["replication_factor"].as_i64().unwrap(), 7);
1442 assert_eq!(v["t2"]["custom"].as_str().unwrap(), "keep");
1443 }
1444
1445 #[test]
1446 fn merged_cluster_tier_config_errors_on_non_mapping_root() {
1447 let plugin_dir = temp_dir_unique("pike_test_non_mapping_root");
1448 let config_path = PathBuf::from("picodata.yaml");
1449 let config_file = plugin_dir.join(&config_path);
1450
1451 fs::write(&config_file, "- a\n- b\n- c\n").unwrap();
1452
1453 let mut tiers = BTreeMap::new();
1454 tiers.insert(
1455 "t1".to_string(),
1456 Tier {
1457 replicasets: 1,
1458 replication_factor: 1,
1459 },
1460 );
1461
1462 let err = get_merged_cluster_tier_config(&plugin_dir, &config_path, &tiers).unwrap_err();
1463 let msg = format!("{err:#}");
1464 assert!(
1465 msg.contains("expected YAML mapping"),
1466 "expected error about non-mapping root, got: {msg}"
1467 );
1468 }
1469
1470 #[test]
1471 fn enable_plugins_fails_on_missing_version() {
1472 let topology = Topology {
1473 tiers: BTreeMap::new(),
1474 plugins: {
1475 let mut m = BTreeMap::new();
1476 m.insert(
1477 "p".to_string(),
1478 Plugin {
1479 migration_context: vec![],
1480 services: BTreeMap::new(),
1481 version: None,
1482 path: None,
1483 },
1484 );
1485 m
1486 },
1487 enviroment: BTreeMap::new(),
1488 pre_install_sql: vec![],
1489 };
1490 let cluster_dir = temp_dir_unique("pike_test_cluster");
1491 let picodata_path = Path::new("picodata");
1492 let err = enable_plugins(&topology, &cluster_dir, picodata_path).unwrap_err();
1493 let msg = format!("{err:#}");
1494 assert!(
1495 msg.contains("plugin version is missing"),
1496 "Expected explicit error on missing version, got: {msg}"
1497 );
1498 }
1499
1500 #[test]
1501 fn merged_cluster_tier_config_errors_on_null_root() {
1502 let plugin_dir = temp_dir_unique("pike_test_null_root");
1503 let config_path = PathBuf::from("picodata.yaml");
1504 let config_file = plugin_dir.join(&config_path);
1505
1506 fs::write(&config_file, "null").unwrap();
1507
1508 let mut tiers = BTreeMap::new();
1509 tiers.insert(
1510 "t1".to_string(),
1511 Tier {
1512 replicasets: 1,
1513 replication_factor: 1,
1514 },
1515 );
1516
1517 let err = get_merged_cluster_tier_config(&plugin_dir, &config_path, &tiers).unwrap_err();
1518 assert!(
1519 format!("{err:#}").contains("expected YAML mapping"),
1520 "expected error about null root, got: {err}"
1521 );
1522 }
1523
1524 #[test]
1525 fn external_plugin_absolute_path_supported() {
1526 let base = tmp_dir("abs");
1528 let plugin_name = "abs_plugin";
1529 let version = "0.1.0";
1530 let shipping_root = base.join(plugin_name).join(version);
1531 fs::create_dir_all(&shipping_root).unwrap();
1532 fs::write(shipping_root.join("manifest.yaml"), "name: abs_plugin\n").unwrap();
1533
1534 let kind = get_external_plugin_path_kind(&base.join(plugin_name)).unwrap();
1535 assert_eq!(kind, PluginPathKind::ShippingDirectory);
1536 }
1537
1538 #[test]
1539 fn materialize_external_plugin_shipping_directory() {
1540 let base = tmp_dir("mat_dir");
1541 let src = base.join("test_plugin");
1542 let dst = base.join("dst");
1543 fs::create_dir_all(&src).unwrap();
1544 fs::create_dir_all(&dst).unwrap();
1545
1546 fs::write(src.join("manifest.yaml"), "name: test\n").unwrap();
1547
1548 let topology = Topology::default();
1549 let params = ParamsBuilder::default().topology(topology).build().unwrap();
1550
1551 materialize_external_plugin(
1552 "test_plugin",
1553 PluginPathKind::ShippingDirectory,
1554 &src,
1555 ¶ms,
1556 &dst,
1557 )
1558 .unwrap();
1559
1560 assert!(dst.join("test_plugin/manifest.yaml").exists());
1561 }
1562
1563 #[test]
1564 fn materialize_external_plugin_shipping_archive() {
1565 let base = tmp_dir("mat_arc");
1566 fs::create_dir_all(&base).unwrap();
1567 let archive_path = base.join("plugin.tar.gz");
1568 let dst = base.join("dst");
1569 fs::create_dir_all(&dst).unwrap();
1570
1571 {
1573 let file = File::create(&archive_path).unwrap();
1574 let enc = GzEncoder::new(file, Compression::default());
1575 let mut tar = Builder::new(enc);
1576
1577 let header_path_manifest = Path::new("test_plugin/0.1.0/manifest.yaml");
1580 let mut header = tar::Header::new_gnu();
1581 header.set_size(0);
1582 header.set_cksum();
1583 tar.append_data(&mut header, header_path_manifest, &mut "".as_bytes())
1584 .unwrap();
1585
1586 let lib_name = format!("test_plugin/0.1.0/libtest.{LIB_EXT}");
1587 let header_path_lib = Path::new(&lib_name);
1588 let mut header_lib = tar::Header::new_gnu();
1589 header_lib.set_size(0);
1590 header_lib.set_cksum();
1591 tar.append_data(&mut header_lib, header_path_lib, &mut "".as_bytes())
1592 .unwrap();
1593
1594 tar.finish().unwrap();
1595 }
1596
1597 let topology = Topology::default();
1598 let params = ParamsBuilder::default().topology(topology).build().unwrap();
1599
1600 materialize_external_plugin(
1601 "test_plugin",
1602 PluginPathKind::ShippingArchive,
1603 &archive_path,
1604 ¶ms,
1605 &dst,
1606 )
1607 .unwrap();
1608
1609 let unpacked_manifest = dst.join("test_plugin/0.1.0/manifest.yaml");
1610 assert!(
1611 unpacked_manifest.exists(),
1612 "manifest should be unpacked at {}",
1613 unpacked_manifest.display()
1614 );
1615 }
1616
1617 #[test]
1618 fn materialize_external_plugin_crate_no_build() {
1619 let base = tmp_dir("mat_crate");
1620 let plugin_path = base.join("my_plugin");
1621 let artifact_path = plugin_path.join("target/debug/my_plugin");
1623 fs::create_dir_all(&artifact_path).unwrap();
1624 fs::write(artifact_path.join("manifest.yaml"), "artifact").unwrap();
1625
1626 let dst = base.join("dst");
1627 fs::create_dir_all(&dst).unwrap();
1628
1629 let topology = Topology::default();
1630 let params = ParamsBuilder::default()
1631 .topology(topology)
1632 .no_build(true)
1633 .target_dir(PathBuf::from("target"))
1634 .build()
1635 .unwrap();
1636
1637 materialize_external_plugin(
1638 "my_plugin",
1639 PluginPathKind::CrateOrWorkspaceDirectory,
1640 &plugin_path,
1641 ¶ms,
1642 &dst,
1643 )
1644 .unwrap();
1645
1646 assert!(dst.join("my_plugin/manifest.yaml").exists());
1647 }
1648 #[test]
1649 fn test_topology_deserialization_with_pre_install_sql() {
1650 let toml_str = r#"
1651 pre_install_sql = [
1652 'CREATE TABLE "t" ("id" INT PRIMARY KEY);',
1653 "ALTER SYSTEM SET param = 'value';",
1654 '''
1655 ALTER SYSTEM SET multiline = 'test';
1656 '''
1657 ]
1658 [tier.default]
1659 replicasets = 1
1660 replication_factor = 1
1661 "#;
1662 let topology: Topology = toml::from_str(toml_str).unwrap();
1663 assert_eq!(topology.pre_install_sql.len(), 3);
1664 assert_eq!(
1665 topology.pre_install_sql[0].trim(),
1666 "CREATE TABLE \"t\" (\"id\" INT PRIMARY KEY);"
1667 );
1668 assert_eq!(
1669 topology.pre_install_sql[1].trim(),
1670 "ALTER SYSTEM SET param = 'value';"
1671 );
1672 assert_eq!(
1673 topology.pre_install_sql[2].trim(),
1674 "ALTER SYSTEM SET multiline = 'test';"
1675 );
1676 }
1677}