Skip to main content

pike/commands/
run.rs

1mod readiness;
2
3use crate::healthcheck::api::get_health_status;
4use anyhow::{anyhow, bail, Context, Result};
5use colored::Colorize;
6use derive_builder::Builder;
7use log::{error, info, warn};
8use nix::sys::signal::{kill, Signal};
9use nix::unistd::Pid;
10use rand::RngExt;
11use serde::Deserialize;
12use serde_yaml::{Mapping, Value};
13use std::collections::{BTreeMap, HashMap};
14use std::fs;
15use std::fs::{File, OpenOptions};
16use std::io::{BufRead, BufReader, ErrorKind, Read, Write};
17use std::net::SocketAddrV4;
18use std::os::unix::fs::symlink;
19use std::path::{Path, PathBuf};
20use std::process::{Child, Command, Stdio};
21use std::str::{self};
22use std::sync::{Arc, Mutex};
23use std::thread::{self, JoinHandle};
24use std::time::{Duration, Instant};
25
26use crate::commands::lib::instance_info::InstanceSocketClient;
27use crate::commands::lib::{
28    cargo_build, copy_directory_tree, find_active_socket_path, get_cluster_dir,
29    log_instance_skipped, log_instance_started, run_query_in_picodata_admin, spawn_picodata_admin,
30    unpack_shipping_archive,
31};
32use crate::commands::lib::{get_active_socket_path, BuildType};
33use crate::commands::lib::{is_plugin_archive, is_plugin_dir, is_plugin_shipping_dir};
34
35const BAFFLED_WHALE: &str = r"
36  __________________________________________________________
37/ Iiiiiiiiit seeeeeems Piiiiicooooodaaaaataaaaaa iiiiiiiiis \
38| noooooooot iiiiiinstaaaaaalleeeeed oooooon yyyyyooooouuur |
39\ syyyyyyysteeeeeeem.                                       /
40  ----------------------------------------------------------
41  |
42  |
43  |     .-------------'```'----....,,__                        _,
44  |    |                               `'`'`'`'-.,.__        .'(
45  |    |                                             `'--._.'   )
46  |    |                                                   `'-.<
47  |    \               .-'`'-.                            -.    `\
48   \    \               -.o_.     _                     _,-'`\    |
49    \_   ``````''--.._.-=-._    .'  \            _,,--'`      `-._(
50          (^^^^^^^^`___    '-. |    \  __,,..--'                 `
51           `````````   `'--..___\    |`
52                                 `-.,'
53 ";
54
55const TIMEOUT_WAITING_FOR_INSTANCE_READINESS: Duration = Duration::from_secs(10);
56
57#[derive(Debug, Deserialize, Clone)]
58pub struct Tier {
59    pub replicasets: u8,
60    pub replication_factor: u8,
61}
62
63#[derive(Debug, Deserialize, Clone)]
64pub struct MigrationContextVar {
65    pub name: String,
66    pub value: String,
67}
68
69#[derive(Default, Debug, Deserialize, Clone)]
70pub struct Service {
71    pub tiers: Vec<String>,
72}
73
74/// Describes contents for provided plugin path
75#[derive(Clone, Copy, Debug, Eq, PartialEq)]
76pub enum PluginPathKind {
77    /// Cargo workspace or a standalone project
78    CrateOrWorkspaceDirectory,
79    /// Directory with versioned plugin contents
80    ShippingDirectory,
81    /// Archive with shipping directory inside
82    ShippingArchive,
83}
84
85#[derive(Default, Debug, Deserialize, Clone)]
86pub struct Plugin {
87    #[serde(default)]
88    pub migration_context: Vec<MigrationContextVar>,
89    #[serde(default)]
90    #[serde(rename = "service")]
91    pub services: BTreeMap<String, Service>,
92    #[serde(skip)]
93    pub version: Option<String>,
94    /// Path to plugin, if it is located outside current directory.
95    ///
96    /// Supported formats:
97    /// - Relative
98    /// - Absolute
99    ///
100    /// Path should conform to one of path kinds, see [`PluginPathKind`]
101    pub path: Option<PathBuf>,
102}
103
104impl Plugin {
105    fn is_external(&self) -> bool {
106        self.path.is_some()
107    }
108}
109
110#[derive(Default, Debug, Deserialize, Clone)]
111pub struct Topology {
112    #[serde(rename = "tier")]
113    pub tiers: BTreeMap<String, Tier>,
114    #[serde(rename = "plugin")]
115    #[serde(default)]
116    pub plugins: BTreeMap<String, Plugin>,
117    #[serde(default)]
118    pub enviroment: BTreeMap<String, String>,
119    #[serde(default)]
120    pub pre_install_sql: Vec<String>,
121}
122
123impl Topology {
124    /// Parse topology toml file and validate the fields.
125    /// Emit warning upon meeting alien fields.
126    pub fn parse_toml(path: &PathBuf) -> Result<Self> {
127        let content = fs::read_to_string(path)
128            .context(format!("failed to read topology from {}", path.display()))?;
129
130        let deserializer = toml::de::Deserializer::parse(&content)
131            .context(format!("failed to parse topology from {}", path.display()))?;
132
133        serde_ignored::deserialize(deserializer, |f| {
134            warn!("Unknown field in topology TOML: {f}");
135        })
136        .map_err(anyhow::Error::from)
137    }
138
139    fn find_plugin_versions(&mut self, plugins_dir: &Path) -> Result<()> {
140        for (plugin_name, plugin) in &mut self.plugins {
141            let current_plugin_dir = plugins_dir.join(plugin_name);
142
143            if !current_plugin_dir.exists() {
144                bail!(
145                    "plugin directory {} does not exist",
146                    current_plugin_dir.display()
147                );
148            }
149            let mut versions: Vec<_> = fs::read_dir(current_plugin_dir)?
150                .map(|r| r.unwrap())
151                .collect();
152            versions.sort_by_key(fs::DirEntry::path);
153            let newest_version = versions
154                .last()
155                .unwrap()
156                .file_name()
157                .to_str()
158                .unwrap()
159                .to_string();
160            plugin.version = Some(newest_version);
161        }
162        Ok(())
163    }
164
165    fn has_external_plugins(&self) -> bool {
166        self.plugins.values().any(Plugin::is_external)
167    }
168}
169
170fn enable_plugins(topology: &Topology, cluster_dir: &Path, picodata_path: &Path) -> Result<()> {
171    let mut queries: Vec<String> = Vec::new();
172
173    if !topology.pre_install_sql.is_empty() {
174        info!("Executing pre-install SQL scripts...");
175        for query in &topology.pre_install_sql {
176            queries.push(query.clone());
177        }
178    }
179
180    for (plugin_name, plugin) in &topology.plugins {
181        let Some(plugin_version) = plugin.version.as_ref() else {
182            bail!("plugin version is missing for '{plugin_name}'");
183        };
184
185        // create plugin
186        queries.push(format!(
187            r#"CREATE PLUGIN "{plugin_name}" {plugin_version};"#
188        ));
189
190        // add services to tiers
191        for (service_name, service) in &plugin.services {
192            for tier_name in &service.tiers {
193                queries.push(format!(r#"ALTER PLUGIN "{plugin_name}" {plugin_version} ADD SERVICE "{service_name}" TO TIER "{tier_name}";"#));
194            }
195        }
196
197        // add migration context
198        for migration_env in &plugin.migration_context {
199            queries.push(format!(
200                "ALTER PLUGIN \"{plugin_name}\" {plugin_version} SET migration_context.{}='{}';",
201                migration_env.name, migration_env.value
202            ));
203        }
204
205        // run migrations
206        queries.push(format!(
207            r#"ALTER PLUGIN "{plugin_name}" MIGRATE TO {plugin_version};"#
208        ));
209
210        // enable plugin
211        queries.push(format!(
212            r#"ALTER PLUGIN "{plugin_name}" {plugin_version} ENABLE;"#
213        ));
214    }
215
216    let admin_socket = cluster_dir.join("i1").join("admin.sock");
217
218    for query in queries {
219        info!("picodata admin: {query}");
220
221        let mut picodata_admin = spawn_picodata_admin(picodata_path, &admin_socket)?;
222
223        {
224            let picodata_stdin = picodata_admin.stdin.as_mut().unwrap();
225            picodata_stdin
226                .write_all(query.as_bytes())
227                .context("failed to send plugin installation queries")?;
228        }
229
230        let exit_code = picodata_admin
231            .wait()
232            .context("failed to wait for picodata admin")?
233            .code()
234            .unwrap();
235
236        let outputs: [Box<dyn Read + Send>; 2] = [
237            Box::new(picodata_admin.stdout.unwrap()),
238            Box::new(picodata_admin.stderr.unwrap()),
239        ];
240
241        let mut ignore_errors = false;
242        for output in outputs {
243            let reader = BufReader::new(output);
244            for line in reader.lines() {
245                let line = line.expect("failed to read picodata admin output");
246                info!("picodata admin: {line}");
247
248                // Ignore some types of error messages like re-enabling the plugin
249                let err_messages_to_ignore: Vec<&str> = vec!["already enabled", "already exists"];
250                for err_message in err_messages_to_ignore {
251                    if line.contains(err_message) {
252                        ignore_errors = true;
253                    }
254                }
255            }
256        }
257
258        if exit_code == 1 && !ignore_errors {
259            bail!("failed to execute picodata query {query}");
260        }
261    }
262
263    for (plugin_name, plugin) in &topology.plugins {
264        info!(
265            "Plugin {plugin_name}:{} has been enabled",
266            plugin.version.as_ref().unwrap()
267        );
268    }
269
270    Ok(())
271}
272
273fn get_ipv4_from_template_var(
274    env_vars: &BTreeMap<String, String>,
275    variable: &str,
276) -> Option<SocketAddrV4> {
277    let env_ipv4 = env_vars.get(variable)?;
278    let env_ipv4 = env_ipv4.parse::<SocketAddrV4>().unwrap_or_else(|e| {
279        panic!("could not parse {variable} to an ipv4 address: {e}. hint: use 127.0.0.1")
280    });
281    let ip = env_ipv4.ip();
282    assert!(
283        ip.is_loopback() || ip.is_unspecified(),
284        "ipv4 address {env_ipv4:?} of variable {variable} \
285        is not loopback (127.0.0.1) or unspecified (0.0.0.0), \
286        so it can't be used in pike."
287    );
288    Some(env_ipv4)
289}
290
291fn get_picodata_version(picodata_path: &PathBuf) -> Result<String> {
292    let picodata_output = Command::new(picodata_path).arg("--version").output();
293
294    let picodata_output = match picodata_output {
295        Ok(o) => o,
296        Err(err) if err.kind() == ErrorKind::NotFound => {
297            println!("{BAFFLED_WHALE}");
298            bail!("Picodata not found")
299        }
300        Err(err) => bail!("failed to get picodata version ({err})"),
301    };
302
303    Ok(str::from_utf8(&picodata_output.stdout)?.to_string())
304}
305
306#[allow(dead_code)]
307pub struct PicodataInstanceProperties<'a> {
308    pub bin_port: &'a u16,
309    pub pg_port: &'a u16,
310    pub http_port: &'a u16,
311    pub data_dir: &'a Path,
312    pub instance_name: &'a str,
313    pub tier: &'a str,
314    pub instance_id: &'a u16,
315}
316
317#[derive(Debug)]
318pub struct PicodataInstance {
319    instance_name: String,
320    instance_id: u16,
321    tier: String,
322    log_threads: Option<Vec<JoinHandle<()>>>,
323    child: Child,
324    daemon: bool,
325    disable_colors: bool,
326    data_dir: PathBuf,
327    log_file_path: PathBuf,
328    pg_port: u16,
329    bin_port: u16,
330    http_port: u16,
331}
332
333impl PicodataInstance {
334    #[allow(clippy::too_many_lines)]
335    fn new(
336        instance_id: u16,
337        plugins_dir: Option<&PathBuf>,
338        tier: &str,
339        run_params: &Params,
340    ) -> Result<Self> {
341        // Properties
342        let mut instance_name = Self::make_name(instance_id);
343        let tiers_config = get_merged_cluster_tier_config(
344            &run_params.plugin_path,
345            &run_params.config_path,
346            &run_params.topology.tiers,
347        )?;
348
349        // Paths
350        let cluster_dir = get_cluster_dir(&run_params.plugin_path, &run_params.data_dir);
351        let instance_data_dir = cluster_dir.join(&instance_name);
352        let log_file_path = instance_data_dir.join("picodata.log");
353        let audit_file_path = instance_data_dir.join("audit.log");
354
355        fs::create_dir_all(&instance_data_dir).context("Failed to create instance data dir")?;
356
357        let mut template_env = minijinja::Environment::new();
358
359        for x in &run_params.topology.enviroment {
360            template_env.add_template(x.0.as_str(), x.1.as_str())?;
361        }
362
363        let env_templates_ctx = minijinja::context! {
364            instance_id => instance_id,
365        };
366        let env_vars: BTreeMap<String, String> =
367            Self::compute_env_vars(&template_env, &env_templates_ctx)?;
368
369        let first_env_vars: BTreeMap<String, String> = Self::compute_env_vars(
370            &template_env,
371            &minijinja::context! {
372                instance_id => 1,
373            },
374        )?;
375
376        let mut child = Command::new(&run_params.picodata_path);
377        child.envs(&env_vars);
378
379        let first_instance_bin_ipv4 =
380            get_ipv4_from_template_var(&first_env_vars, "PICODATA_IPROTO_LISTEN")
381                .unwrap_or(format!("127.0.0.1:{}", run_params.base_bin_port + 1).parse()?);
382        let bin_ipv4 = get_ipv4_from_template_var(&env_vars, "PICODATA_IPROTO_LISTEN")
383            .unwrap_or(format!("127.0.0.1:{}", run_params.base_bin_port + instance_id).parse()?);
384        let http_ipv4 = get_ipv4_from_template_var(&env_vars, "PICODATA_HTTP_LISTEN")
385            .unwrap_or(format!("0.0.0.0:{}", run_params.base_http_port + instance_id).parse()?);
386        let pg_ipv4 = get_ipv4_from_template_var(&env_vars, "PICODATA_PG_LISTEN")
387            .unwrap_or(format!("127.0.0.1:{}", run_params.base_pg_port + instance_id).parse()?);
388
389        child.args([
390            "run",
391            "--instance-dir",
392            instance_data_dir.to_str().expect("unreachable"),
393            "--iproto-listen",
394            &bin_ipv4.to_string(),
395            "--peer",
396            &first_instance_bin_ipv4.to_string(),
397            "--http-listen",
398            &http_ipv4.to_string(),
399            "--pg-listen",
400            &pg_ipv4.to_string(),
401            "--tier",
402            tier,
403            "--config-parameter",
404            &format!("cluster.tier={tiers_config}",),
405        ]);
406
407        let config_path = run_params.plugin_path.join(&run_params.config_path);
408        if config_path.exists() {
409            // Template the config file with instance_id
410            let instance_config_path =
411                Self::template_config(&config_path, &instance_data_dir, &env_templates_ctx)?;
412            child.args([
413                "--config",
414                instance_config_path.to_str().unwrap_or("./picodata.yaml"),
415            ]);
416        } else {
417            warn!(
418                "couldn't locate picodata config at {} - skipping.",
419                config_path.to_str().unwrap()
420            );
421        }
422
423        if let Some(plugins_dir) = plugins_dir {
424            child.args([
425                "--share-dir",
426                plugins_dir.to_str().unwrap_or("target/debug"),
427            ]);
428        }
429
430        if run_params.daemon {
431            child.stdout(Stdio::null()).stderr(Stdio::null());
432            child.args(["--log", log_file_path.to_str().expect("unreachable")]);
433        } else {
434            child.stdout(Stdio::piped()).stderr(Stdio::piped());
435        }
436
437        if run_params.with_audit {
438            child.args(["--audit", audit_file_path.to_str().expect("unreachable")]);
439        }
440
441        let child = child
442            .spawn()
443            .context(format!("failed to start picodata instance: {instance_id}"))?;
444
445        let start = Instant::now();
446        while Instant::now().duration_since(start) < TIMEOUT_WAITING_FOR_INSTANCE_READINESS {
447            thread::sleep(Duration::from_millis(100));
448            let socket_client =
449                InstanceSocketClient::new(&instance_data_dir, &run_params.picodata_path);
450            let Ok(new_instance_name) = socket_client
451                .instance_name()
452                .inspect_err(|err| log::debug!("failed to get name of the instance: {err}"))
453            else {
454                continue;
455            };
456
457            // If name is already known, then socket is ready, i.e. we assume
458            // call below should return without error.
459            let instance_current_state = socket_client.current_state()?;
460            if !instance_current_state.is_online() {
461                info!("Waiting for '{new_instance_name}' to become 'Online'");
462                continue;
463            }
464
465            // create symlink to real instance data dir
466            let symlink_name = cluster_dir.join(&new_instance_name);
467            let _ = fs::remove_file(&symlink_name);
468            symlink(&instance_name, symlink_name)
469                .context("failed create symlink to instance dir")?;
470
471            instance_name = new_instance_name;
472            break;
473        }
474
475        let mut pico_instance = PicodataInstance {
476            instance_name,
477            tier: tier.to_string(),
478            log_threads: None,
479            child,
480            daemon: run_params.daemon,
481            disable_colors: run_params.disable_colors,
482            data_dir: instance_data_dir,
483            log_file_path,
484            pg_port: pg_ipv4.port(),
485            bin_port: bin_ipv4.port(),
486            http_port: http_ipv4.port(),
487            instance_id,
488        };
489
490        if !run_params.daemon {
491            pico_instance.capture_logs()?;
492        }
493
494        // Save pid of picodata process to kill it after
495        pico_instance.make_pid_file()?;
496
497        Ok(pico_instance)
498    }
499
500    pub(crate) fn http_port(&self) -> u16 {
501        self.http_port
502    }
503
504    pub(crate) fn socket_client<'a>(&self, picodata_path: &'a PathBuf) -> InstanceSocketClient<'a> {
505        InstanceSocketClient::new(&self.data_dir, picodata_path)
506    }
507
508    #[allow(dead_code)]
509    #[allow(clippy::must_use_candidate)]
510    pub fn properties(&self) -> PicodataInstanceProperties<'_> {
511        PicodataInstanceProperties {
512            bin_port: &self.bin_port,
513            pg_port: &self.pg_port,
514            http_port: &self.http_port,
515            data_dir: &self.data_dir,
516            instance_name: &self.instance_name,
517            tier: &self.tier,
518            instance_id: &self.instance_id,
519        }
520    }
521
522    fn compute_env_vars(
523        template_env: &minijinja::Environment,
524        ctx: &minijinja::Value,
525    ) -> Result<BTreeMap<String, String>> {
526        let mut result = BTreeMap::new();
527
528        for (name, template) in template_env.templates() {
529            result.insert(name.into(), template.render(ctx)?);
530        }
531
532        Ok(result)
533    }
534
535    /// Templates a picodata.yaml config file with jinja variables (e.g., `instance_id`).
536    /// Returns the path to the templated config file in the instance data directory.
537    fn template_config(
538        config_path: &Path,
539        instance_data_dir: &Path,
540        ctx: &minijinja::Value,
541    ) -> Result<PathBuf> {
542        let config_content = fs::read_to_string(config_path).with_context(|| {
543            format!(
544                "failed to read picodata config at {}",
545                config_path.display()
546            )
547        })?;
548
549        let mut parser = minijinja::Environment::new();
550
551        parser.add_template("config", &config_content)?;
552
553        let template = parser.get_template("config")?;
554
555        let rendered = template.render(ctx).with_context(|| {
556            format!(
557                "failed to render picodata config template at {}",
558                config_path.display()
559            )
560        })?;
561
562        let instance_config_path = instance_data_dir.join("picodata.yaml");
563        fs::write(&instance_config_path, &rendered).with_context(|| {
564            format!(
565                "failed to write templated picodata config to {}",
566                instance_config_path.display()
567            )
568        })?;
569
570        Ok(instance_config_path)
571    }
572
573    fn capture_logs(&mut self) -> Result<()> {
574        let mut rnd = rand::rng();
575        let instance_name_color = colored::CustomColor::new(
576            rnd.random_range(30..220),
577            rnd.random_range(30..220),
578            rnd.random_range(30..220),
579        );
580
581        let file = OpenOptions::new()
582            .create(true)
583            .truncate(true)
584            .write(true)
585            .open(&self.log_file_path)
586            .expect("Failed to open log file");
587        let file = Arc::new(Mutex::new(file));
588
589        let mut log_threads = vec![];
590
591        let stdout = self.child.stdout.take().expect("Failed to capture stdout");
592        let stderr = self.child.stderr.take().expect("Failed to capture stderr");
593        let outputs: [Box<dyn Read + Send>; 2] = [Box::new(stdout), Box::new(stderr)];
594        for child_output in outputs {
595            let mut log_prefix = format!("{}: ", self.instance_name);
596            if !self.disable_colors {
597                log_prefix = log_prefix.custom_color(instance_name_color).to_string();
598            }
599            let file = file.clone();
600
601            let wrapper = move || {
602                let stdout_lines = BufReader::new(child_output).lines();
603                for line in stdout_lines {
604                    let line = line.unwrap();
605                    println!("{log_prefix}{line}");
606                    writeln!(file.lock().unwrap(), "{line}")
607                        .expect("Failed to write line to log file");
608                }
609            };
610
611            let t = thread::Builder::new()
612                .name(format!("log_catcher::{}", self.instance_name))
613                .spawn(wrapper)?;
614
615            log_threads.push(t);
616        }
617
618        self.log_threads = Some(log_threads);
619
620        Ok(())
621    }
622
623    fn make_pid_file(&self) -> Result<()> {
624        let pid = self.child.id();
625        let pid_location = self.data_dir.join("pid");
626        let mut file = File::create(pid_location)?;
627        writeln!(file, "{pid}")?;
628        Ok(())
629    }
630
631    fn make_name(id: u16) -> String {
632        format!("i{id}")
633    }
634
635    fn kill(&mut self) -> Result<()> {
636        Ok(self.child.kill()?)
637    }
638
639    fn join(&mut self) {
640        let Some(threads) = self.log_threads.take() else {
641            return;
642        };
643        for h in threads {
644            h.join()
645                .expect("Failed to join thread for picodata instance");
646        }
647    }
648}
649
650impl Drop for PicodataInstance {
651    fn drop(&mut self) {
652        if self.daemon {
653            return;
654        }
655
656        self.child
657            .wait()
658            .expect("Failed to wait for picodata instance");
659    }
660}
661
662fn get_merged_cluster_tier_config(
663    plugin_path: &Path,
664    config_path: &Path,
665    tiers: &BTreeMap<String, Tier>,
666) -> Result<String> {
667    let picodata_conf_path = plugin_path.join(config_path);
668
669    // Отсутствие файла — валидный пустой конфиг
670    let maybe_raw = fs::read_to_string(&picodata_conf_path);
671    let root_value: Value = match maybe_raw {
672        Err(e) if e.kind() == ErrorKind::NotFound => Value::Mapping(Mapping::new()),
673        Err(e) => {
674            return Err(e).with_context(|| {
675                format!(
676                    "failed to read picodata config at {}",
677                    picodata_conf_path.display()
678                )
679            })
680        }
681        Ok(raw) => {
682            // Пустой файл — валидный пустой конфиг
683            if raw.trim().is_empty() {
684                Value::Mapping(Mapping::new())
685            } else {
686                serde_yaml::from_str::<Value>(&raw).with_context(|| {
687                    format!(
688                        "invalid YAML in picodata config at {}",
689                        picodata_conf_path.display()
690                    )
691                })?
692            }
693        }
694    };
695
696    let root_map = match root_value {
697        Value::Mapping(m) => m,
698        other => {
699            bail!(
700                "invalid root in picodata config at {}: expected YAML mapping object, got {:?}",
701                picodata_conf_path.display(),
702                other
703            );
704        }
705    };
706
707    let cluster_params = root_map
708        .get("cluster")
709        .and_then(Value::as_mapping)
710        .cloned()
711        .unwrap_or_else(Mapping::new);
712
713    let mut tier_params = cluster_params
714        .get("tier")
715        .and_then(Value::as_mapping)
716        .cloned()
717        .unwrap_or_else(Mapping::new);
718
719    for value in tier_params.values_mut() {
720        if value.is_null() {
721            *value = Value::Mapping(Mapping::new());
722        }
723    }
724
725    for (tier_name, tier_value) in tiers {
726        tier_params
727            .entry(Value::String(tier_name.clone()))
728            .and_modify(|entry| {
729                if let Value::Mapping(ref mut map) = entry {
730                    map.insert(
731                        Value::String("replication_factor".into()),
732                        Value::Number(tier_value.replication_factor.into()),
733                    );
734                }
735            })
736            .or_insert_with(|| {
737                let mut map = Mapping::new();
738                map.insert(
739                    Value::String("replication_factor".into()),
740                    Value::Number(tier_value.replication_factor.into()),
741                );
742                Value::Mapping(map)
743            });
744    }
745
746    serde_json::to_string(&tier_params).context("failed to serialize cluster.tier to JSON")
747}
748
749fn get_external_plugin_path_kind(path: &Path) -> Result<PluginPathKind> {
750    // Forbid symlinks explicitly to avoid confusing copies/unpacks
751    if path
752        .symlink_metadata()
753        .map(|m| m.file_type().is_symlink())
754        .unwrap_or(false)
755    {
756        bail!(
757            "symlink as external plugin path is not supported: {}",
758            path.display()
759        );
760    }
761
762    let meta = path.metadata().with_context(|| {
763        format!(
764            "failed to query external plugin path metadata at {}",
765            path.display()
766        )
767    })?;
768
769    if meta.is_file() {
770        return match is_plugin_archive(path) {
771            Ok(()) => Ok(PluginPathKind::ShippingArchive),
772            Err(error) => Err(error.context("external plugin path is an unknown file")),
773        };
774    }
775    if meta.is_dir() {
776        if is_plugin_dir(path) {
777            return Ok(PluginPathKind::CrateOrWorkspaceDirectory);
778        }
779        return match is_plugin_shipping_dir(path) {
780            Ok(()) => Ok(PluginPathKind::ShippingDirectory),
781            Err(error) => {
782                Err(error.context("external plugin path directory has invalid structure"))
783            }
784        };
785    }
786    // should be unreachable
787    bail!("unknown external plugin path type: '{}'", path.display());
788}
789
790/// Helper function for `prepare_external_plugins` receding repeating code
791fn materialize_external_plugin(
792    name: &str,
793    kind: PluginPathKind,
794    path: &PathBuf,
795    params: &Params,
796    plugin_run_dir: &Path,
797) -> Result<()> {
798    match kind {
799        PluginPathKind::ShippingArchive => {
800            unpack_shipping_archive(path, plugin_run_dir).with_context(|| {
801                format!(
802                    "failed to unpack shipping archive for plugin '{}' from '{}'",
803                    name,
804                    path.display()
805                )
806            })?;
807        }
808        PluginPathKind::ShippingDirectory => {
809            copy_directory_tree(path, plugin_run_dir).with_context(|| {
810                format!(
811                    "failed to copy shipping directory for plugin '{}' from '{}'",
812                    name,
813                    path.display()
814                )
815            })?;
816        }
817        PluginPathKind::CrateOrWorkspaceDirectory => {
818            let (profile, target_dir) = (params.get_build_profile(), &params.target_dir);
819            if !params.no_build {
820                cargo_build(profile, target_dir, path).with_context(|| {
821                    format!(
822                        "failed to build external cargo plugin '{}' at '{}'",
823                        name,
824                        path.display()
825                    )
826                })?;
827            }
828            let src_shipping_dir = path.join(target_dir).join(profile.to_string()).join(name);
829            copy_directory_tree(&src_shipping_dir, plugin_run_dir).with_context(|| {
830                format!(
831                    "failed to copy built plugin '{}' from '{}' (profile {})",
832                    name,
833                    src_shipping_dir.display(),
834                    profile
835                )
836            })?;
837        }
838    }
839    Ok(())
840}
841
842/// Prepares plugin directory structure for external plugins from topology
843///
844/// Depending on whether plugin path destination is plugin project directory,
845/// built plugin directory or zip-packed plugin directory, maybe invoke cargo build
846fn prepare_external_plugins(params: &Params, plugin_run_dir: &Path) -> Result<()> {
847    if !params.topology.has_external_plugins() {
848        return Ok(());
849    }
850
851    let external_plugins: Vec<_> = params
852        .topology
853        .plugins
854        .iter()
855        .filter(|(_, p)| p.is_external())
856        .collect();
857
858    info!(
859        "Found {} external plugins, loading into {}",
860        external_plugins.len(),
861        plugin_run_dir.display()
862    );
863
864    let mut path_info: HashMap<&str, (PluginPathKind, PathBuf)> =
865        HashMap::with_capacity(external_plugins.len());
866
867    for (name, plugin) in external_plugins {
868        let path = plugin
869            .path
870            .as_ref()
871            .ok_or_else(|| anyhow!("external plugin '{name}' has no path"))?;
872        let kind = get_external_plugin_path_kind(path).with_context(|| {
873            format!(
874                "failed to validate external path '{}' for plugin {}",
875                path.display(),
876                name
877            )
878        })?;
879        path_info.insert(name.as_str(), (kind, path.clone()));
880    }
881
882    for (name, (kind, path)) in &path_info {
883        materialize_external_plugin(name, *kind, path, params, plugin_run_dir)?;
884    }
885    Ok(())
886}
887
888#[allow(clippy::struct_excessive_bools)]
889#[derive(Debug, Builder, Clone)]
890pub struct Params {
891    topology: Topology,
892    #[builder(default = "PathBuf::from(\"./tmp\")")]
893    data_dir: PathBuf,
894    #[builder(default = "false")]
895    disable_plugin_install: bool,
896    #[builder(default = "3000")]
897    base_bin_port: u16,
898    #[builder(default = "8000")]
899    base_http_port: u16,
900    #[builder(default = "PathBuf::from(\"picodata\")")]
901    picodata_path: PathBuf,
902    #[builder(default = "5432")]
903    base_pg_port: u16,
904    #[builder(default = "false")]
905    use_release: bool,
906    #[builder(default = "PathBuf::from(\"target\")")]
907    target_dir: PathBuf,
908    #[builder(default = "false")]
909    daemon: bool,
910    #[builder(default = "false")]
911    disable_colors: bool,
912    #[builder(default = "PathBuf::from(\"./\")")]
913    plugin_path: PathBuf,
914    #[builder(default = "false")]
915    no_build: bool,
916    #[builder(default = "PathBuf::from(\"./picodata.yaml\")")]
917    config_path: PathBuf,
918    #[builder(default)]
919    instance_name: Option<String>,
920    #[builder(default = "false")]
921    with_web_auth: bool,
922    #[builder(default = "false")]
923    with_audit: bool,
924    #[builder(default = "false")]
925    wait_vshard_discovery: bool,
926    #[builder(default = "300")]
927    wait_vshard_discovery_timeout: u64,
928}
929
930impl Params {
931    pub fn get_build_profile(&self) -> BuildType {
932        if self.use_release {
933            BuildType::Release
934        } else {
935            BuildType::Debug
936        }
937    }
938
939    pub fn get_plugins_dir(&self) -> PathBuf {
940        let build_profile = self.get_build_profile();
941        self.plugin_path
942            .join(self.target_dir.join(build_profile.to_string()))
943    }
944
945    pub fn get_cluster_dir(&self) -> PathBuf {
946        get_cluster_dir(&self.plugin_path, &self.data_dir)
947    }
948}
949
950fn configure_web_auth<F>(
951    picodata_path: &Path,
952    socket_path: &Path,
953    with_web_auth: bool,
954    run_admin: F,
955) -> Result<()>
956where
957    F: Fn(&Path, &Path, &str) -> Result<String>,
958{
959    if with_web_auth {
960        run_admin(picodata_path, socket_path, "ALTER SYSTEM RESET jwt_secret;")
961            .context("failed to enable WebUI authentication (RESET jwt_secret)")?;
962        info!("WebUI auth: включена (RESET jwt_secret).");
963    } else {
964        run_admin(
965            picodata_path,
966            socket_path,
967            "ALTER SYSTEM SET jwt_secret = '';",
968        )
969        .context("failed to disable WebUI authentication (SET jwt_secret='')")?;
970        info!("WebUI auth: отключена (jwt_secret='').");
971    }
972    Ok(())
973}
974
975// При ошибке только предупреждаем, запуск не падает
976fn apply_web_auth_setting(params: &Params, cluster_dir: &Path) -> Result<()> {
977    let Some(socket_path) = find_active_socket_path(cluster_dir)? else {
978        bail!("не удалось найти активный admin.sock для применения настройки WebUI auth");
979    };
980
981    let run_admin = |p: &Path, s: &Path, q: &str| run_query_in_picodata_admin(p, s, q);
982
983    if let Err(err) = configure_web_auth(
984        &params.picodata_path,
985        &socket_path,
986        params.with_web_auth,
987        run_admin,
988    ) {
989        warn!("Не удалось применить WebUI auth настройку: {err:#}");
990    }
991
992    Ok(())
993}
994
995fn run_single_instance(
996    params: &Params,
997    plugins_dir: Option<&PathBuf>,
998) -> anyhow::Result<Vec<PicodataInstance>> {
999    let instance_name = params.instance_name.as_ref().unwrap().clone();
1000    let cluster_dir = params.get_cluster_dir();
1001
1002    // Check whether instance is already started.
1003    if get_active_socket_path(&cluster_dir, &instance_name).is_some() {
1004        log_instance_skipped(instance_name);
1005        return Ok(vec![]);
1006    }
1007
1008    let dirs = fs::read_dir(&cluster_dir).context(format!(
1009        "cluster data dir with path {} does not exist",
1010        cluster_dir.to_string_lossy()
1011    ))?;
1012
1013    info!(
1014        "running picodata cluster instance '{instance_name}', data folder: {}",
1015        cluster_dir.join(&instance_name).to_string_lossy()
1016    );
1017
1018    // Find directory that belongs to instance.
1019    let mut instance_dir = dirs
1020        .into_iter()
1021        .find_map(|result| {
1022            let dir_entry = result.ok()?;
1023            let filename = dir_entry.file_name();
1024
1025            let Some(dir_name) = filename.to_str() else {
1026                // skip non UTF-8 strings
1027                return None;
1028            };
1029
1030            if dir_name != instance_name {
1031                return None;
1032            }
1033
1034            Some(dir_entry.path())
1035        })
1036        .ok_or({
1037            anyhow::anyhow!("failed to locate directory of the instance '{instance_name}")
1038        })?;
1039
1040    if instance_dir.is_symlink() {
1041        instance_dir = fs::read_link(instance_dir)?;
1042    }
1043    let pico_instance_name = instance_dir
1044        .file_name()
1045        .expect("unreachable: canonicalized path cannot have .. as filename")
1046        .to_str();
1047    let instance_id = pico_instance_name
1048        .expect("unreachable: instance path should be convertible to str")[1..]
1049        .parse::<u16>()?;
1050
1051    let mut instance_id_counter = 0;
1052    let mut instance_tier_name = &String::new();
1053    for (tier_name, tier) in &params.topology.tiers {
1054        instance_id_counter += u16::from(tier.replicasets * tier.replication_factor);
1055        if instance_id <= instance_id_counter {
1056            instance_tier_name = tier_name;
1057            break;
1058        }
1059    }
1060
1061    let pico_instance =
1062        PicodataInstance::new(instance_id, plugins_dir, instance_tier_name, params)?;
1063
1064    log_instance_started(instance_name);
1065
1066    apply_web_auth_setting(params, &cluster_dir)?;
1067
1068    Ok(vec![pico_instance])
1069}
1070
1071fn run_cluster(params: &Params, plugins_dir: Option<&PathBuf>) -> Result<Vec<PicodataInstance>> {
1072    assert!(params.instance_name.is_none(), "invariant");
1073
1074    let cluster_dir = params.get_cluster_dir();
1075    let picodata_version = get_picodata_version(&params.picodata_path)?;
1076
1077    info!("Running the cluster with {picodata_version}...");
1078    let start_cluster_run = Instant::now();
1079    let mut picodata_processes = start_instances_in_tiers(params, plugins_dir)?;
1080
1081    readiness::wait_instances_ready(&picodata_processes)?;
1082    apply_web_auth_setting(params, &cluster_dir)?;
1083
1084    if params.wait_vshard_discovery {
1085        readiness::wait_vshard_discovery(&picodata_processes, params)?;
1086    }
1087
1088    if !params.disable_plugin_install && !params.topology.plugins.is_empty() {
1089        if plugins_dir.is_none() {
1090            bail!("failed to enable plugins: directory with plugins is missing.")
1091        }
1092        info!("Enabling plugins...");
1093        let result = enable_plugins(&params.topology, &cluster_dir, &params.picodata_path);
1094        if let Err(e) = result {
1095            for process in &mut picodata_processes {
1096                process.kill().unwrap_or_else(|e| {
1097                    error!("failed to kill picodata instances: {e:#}");
1098                });
1099            }
1100            bail!("failed to enable plugins: {e}");
1101        }
1102    }
1103
1104    info!(
1105        "Picodata cluster has started (launch time: {} sec, total instances: {})",
1106        start_cluster_run.elapsed().as_secs(),
1107        picodata_processes.len()
1108    );
1109
1110    Ok(picodata_processes)
1111}
1112
1113/// Spins up Picodata instances for all tiers defined in the provided topology.
1114///
1115/// This routine iterates over all configured tiers and ensures that the expected
1116/// number of instances (`replicasets * replication_factor`) are running for each tier.
1117///
1118/// # Returns
1119///
1120/// Returns a vector of [`PicodataInstance`] representing **only the instances
1121/// that were started during this invocation**.
1122///
1123fn start_instances_in_tiers(
1124    params: &Params,
1125    plugins_dir: Option<&PathBuf>,
1126) -> anyhow::Result<Vec<PicodataInstance>> {
1127    let cluster_dir = params.get_cluster_dir();
1128    let mut picodata_processes = vec![];
1129    let mut instance_id = 0;
1130
1131    for (tier_name, tier) in &params.topology.tiers {
1132        info!("Starting instances in tier '{tier_name}' ...");
1133        for _ in 0..(tier.replicasets * tier.replication_factor) {
1134            instance_id += 1;
1135            let instance_name = PicodataInstance::make_name(instance_id);
1136
1137            if get_active_socket_path(&cluster_dir, &instance_name).is_some() {
1138                log_instance_skipped(instance_name);
1139                continue;
1140            }
1141
1142            let pico_instance = PicodataInstance::new(instance_id, plugins_dir, tier_name, params)?;
1143            picodata_processes.push(pico_instance);
1144
1145            log_instance_started(instance_name);
1146        }
1147    }
1148
1149    Ok(picodata_processes)
1150}
1151
1152fn prepare_directory_with_plugins(params: &mut Params) -> anyhow::Result<Option<PathBuf>> {
1153    if is_plugin_dir(&params.plugin_path) {
1154        let plugins_dir = params.get_plugins_dir();
1155        let build_profile = params.get_build_profile();
1156
1157        if !params.no_build {
1158            cargo_build(build_profile, &params.target_dir, &params.plugin_path)?;
1159        }
1160
1161        prepare_external_plugins(params, &plugins_dir)?;
1162        params.topology.find_plugin_versions(&plugins_dir)?;
1163        return Ok(Some(plugins_dir));
1164    }
1165
1166    if params.topology.has_external_plugins() {
1167        // Нет родительского плагина, но есть внешние плагины.
1168        // Готовим их в служебной директории кластера и используем её как --share-dir
1169        let plugins_dir = params.get_cluster_dir().join("plugins");
1170
1171        fs::create_dir_all(&plugins_dir).with_context(|| {
1172            format!(
1173                "failed to create directory for external plugins at {}",
1174                plugins_dir.display()
1175            )
1176        })?;
1177
1178        prepare_external_plugins(params, &plugins_dir)?;
1179        params.topology.find_plugin_versions(&plugins_dir)?;
1180        return Ok(Some(plugins_dir));
1181    }
1182
1183    if !params.topology.plugins.is_empty() {
1184        bail!(
1185            "failed to prepare plugins: plugin directory is unknown.\n\
1186            If you use external plugins, ensure they are prepared or run from a pike plugin project"
1187        )
1188    }
1189
1190    Ok(None)
1191}
1192
1193pub fn cluster(mut params: Params) -> Result<Vec<PicodataInstance>> {
1194    let plugins_dir = prepare_directory_with_plugins(&mut params)?;
1195
1196    if params.instance_name.is_some() {
1197        info!("Starting single cluster instance");
1198        return run_single_instance(&params, plugins_dir.as_ref());
1199    }
1200
1201    if let Some(sock_path) = find_active_socket_path(&params.get_cluster_dir())? {
1202        info!("Cluster is running (connected via {})", sock_path.display());
1203        // Reviving terminated instances and exit.
1204        return start_instances_in_tiers(&params, plugins_dir.as_ref());
1205    }
1206
1207    run_cluster(&params, plugins_dir.as_ref())
1208}
1209
1210fn print_webui_url(pico_instances: &[PicodataInstance]) {
1211    let Some(first) = pico_instances.first() else {
1212        return;
1213    };
1214
1215    // Prefer the raft leader's HTTP port
1216    // Fall back to the first instance if the leader is unknown for any reason
1217    // e.g. election still in progress or the health query fails.
1218    let leader_name = get_health_status(first)
1219        .ok()
1220        .map(|s| s.raft.leader_name)
1221        .unwrap_or_default();
1222    // Find the leader instance by name; an empty name means no leader elected yet.
1223    let leader_instance = pico_instances
1224        .iter()
1225        .find(|i| !leader_name.is_empty() && i.properties().instance_name == leader_name);
1226    let port = leader_instance.unwrap_or(first).http_port();
1227
1228    let url = format!("http://localhost:{port}").bold();
1229    println!("\nCluster is running. To open Web UI, visit:\n  {url}\n");
1230}
1231
1232pub fn cmd(params: Params) -> Result<()> {
1233    let is_daemon_mode = params.daemon;
1234    let mut pico_instances = cluster(params)?;
1235
1236    print_webui_url(&pico_instances);
1237
1238    if is_daemon_mode {
1239        return Ok(());
1240    }
1241
1242    // Set Ctrl+C handler. Upon receiving Ctrl+C signal
1243    // All instances would be killed, then joined and
1244    // destructors will be called
1245    let picodata_pids: Vec<u32> = pico_instances.iter().map(|p| p.child.id()).collect();
1246    ctrlc::set_handler(move || {
1247        info!("received Ctrl+C. Shutting down ...");
1248
1249        for &pid in &picodata_pids {
1250            let _ = kill(Pid::from_raw(pid.cast_signed()), Signal::SIGKILL);
1251        }
1252    })
1253    .context("failed to set Ctrl+c handler")?;
1254
1255    // Wait for all instances to stop
1256    for instance in &mut pico_instances {
1257        instance.join();
1258    }
1259
1260    Ok(())
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265    use super::*;
1266    use crate::commands::lib::LIB_EXT;
1267    use flate2::write::GzEncoder;
1268    use flate2::Compression;
1269    use std::cell::RefCell;
1270    use std::time::{SystemTime, UNIX_EPOCH};
1271    use tar::Builder;
1272
1273    fn tmp_dir(prefix: &str) -> PathBuf {
1274        let ts = SystemTime::now()
1275            .duration_since(UNIX_EPOCH)
1276            .unwrap()
1277            .as_nanos();
1278        let mut dir = std::env::temp_dir();
1279        dir.push(format!("pike-run-ut-{prefix}-{ts}"));
1280        dir
1281    }
1282
1283    fn capture_runner(
1284        captured: &RefCell<Vec<String>>,
1285    ) -> impl Fn(&Path, &Path, &str) -> Result<String> + '_ {
1286        move |_: &Path, _: &Path, q: &str| -> Result<String> {
1287            captured.borrow_mut().push(q.to_string());
1288            Ok(String::new())
1289        }
1290    }
1291
1292    #[test]
1293    fn web_auth_config_enables_with_reset() {
1294        let picodata = Path::new("picodata");
1295        let sock = Path::new("/tmp/admin.sock");
1296        let captured: RefCell<Vec<String>> = RefCell::new(vec![]);
1297
1298        configure_web_auth(picodata, sock, true, capture_runner(&captured)).unwrap();
1299
1300        let calls = captured.borrow();
1301        assert_eq!(calls.len(), 1);
1302        assert!(
1303            calls[0].contains("ALTER SYSTEM RESET jwt_secret;"),
1304            "expected RESET query, got: {}",
1305            calls[0]
1306        );
1307    }
1308
1309    #[test]
1310    fn web_auth_config_clears_secret_when_disabled() {
1311        let picodata = Path::new("picodata");
1312        let sock = Path::new("/tmp/admin.sock");
1313        let captured: RefCell<Vec<String>> = RefCell::new(vec![]);
1314
1315        configure_web_auth(picodata, sock, false, capture_runner(&captured)).unwrap();
1316
1317        let calls = captured.borrow();
1318        assert_eq!(calls.len(), 1);
1319        assert!(
1320            calls[0].contains("ALTER SYSTEM SET jwt_secret = '';"),
1321            "expected query to clear secret, got: {}",
1322            calls[0]
1323        );
1324    }
1325
1326    fn temp_dir_unique(prefix: &str) -> PathBuf {
1327        let nanos = SystemTime::now()
1328            .duration_since(UNIX_EPOCH)
1329            .unwrap()
1330            .as_nanos();
1331        let dir = std::env::temp_dir().join(format!("{prefix}_{nanos}"));
1332        fs::create_dir_all(&dir).unwrap();
1333        dir
1334    }
1335
1336    #[test]
1337    fn merged_cluster_tier_config_ok_when_config_missing() {
1338        let plugin_dir = temp_dir_unique("pike_test_missing");
1339        // config_path relative to plugin_dir
1340        let config_path = PathBuf::from("picodata.yaml");
1341
1342        let mut tiers = BTreeMap::new();
1343        tiers.insert(
1344            "t1".to_string(),
1345            Tier {
1346                replicasets: 1,
1347                replication_factor: 2,
1348            },
1349        );
1350
1351        let json = get_merged_cluster_tier_config(&plugin_dir, &config_path, &tiers).unwrap();
1352        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1353        assert!(v.get("t1").is_some());
1354        assert_eq!(
1355            v["t1"]["replication_factor"].as_i64().unwrap(),
1356            2,
1357            "replication_factor from topology must be applied"
1358        );
1359    }
1360
1361    #[test]
1362    fn merged_cluster_tier_config_ok_when_config_empty() {
1363        let plugin_dir = temp_dir_unique("pike_test_empty");
1364        let config_path = PathBuf::from("picodata.yaml");
1365        let config_file = plugin_dir.join(&config_path);
1366        fs::write(&config_file, "").unwrap();
1367
1368        let mut tiers = BTreeMap::new();
1369        tiers.insert(
1370            "t1".to_string(),
1371            Tier {
1372                replicasets: 1,
1373                replication_factor: 3,
1374            },
1375        );
1376
1377        let json = get_merged_cluster_tier_config(&plugin_dir, &config_path, &tiers).unwrap();
1378        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1379        assert_eq!(v["t1"]["replication_factor"].as_i64().unwrap(), 3);
1380    }
1381
1382    #[test]
1383    fn merged_cluster_tier_config_errors_on_invalid_yaml() {
1384        let plugin_dir = temp_dir_unique("pike_test_invalid");
1385        let config_path = PathBuf::from("picodata.yaml");
1386        let config_file = plugin_dir.join(&config_path);
1387        // invalid YAML
1388        fs::write(&config_file, "cluster: [\n - broken").unwrap();
1389
1390        let mut tiers = BTreeMap::new();
1391        tiers.insert(
1392            "t1".to_string(),
1393            Tier {
1394                replicasets: 1,
1395                replication_factor: 1,
1396            },
1397        );
1398
1399        let res = get_merged_cluster_tier_config(&plugin_dir, &config_path, &tiers);
1400        assert!(res.is_err(), "invalid YAML must return error");
1401    }
1402
1403    #[test]
1404    fn merged_cluster_tier_config_preserves_valid_config_and_overrides_replication_factor() {
1405        let plugin_dir = temp_dir_unique("pike_test_valid");
1406        let config_path = PathBuf::from("picodata.yaml");
1407        let config_file = plugin_dir.join(&config_path);
1408        let yaml = r#"
1409        cluster:
1410          tier:
1411            t1: null
1412            t2:
1413              replication_factor: 99
1414              custom: "keep"
1415        "#;
1416        fs::write(&config_file, yaml).unwrap();
1417
1418        let mut tiers = BTreeMap::new();
1419        tiers.insert(
1420            "t1".to_string(),
1421            Tier {
1422                replicasets: 1,
1423                replication_factor: 5,
1424            },
1425        );
1426        tiers.insert(
1427            "t2".to_string(),
1428            Tier {
1429                replicasets: 1,
1430                replication_factor: 7,
1431            },
1432        );
1433
1434        let json = get_merged_cluster_tier_config(&plugin_dir, &config_path, &tiers).unwrap();
1435        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1436
1437        // t1 must become a map with replication_factor
1438        assert_eq!(v["t1"]["replication_factor"].as_i64().unwrap(), 5);
1439
1440        // t2 must override replication_factor but keep custom key
1441        assert_eq!(v["t2"]["replication_factor"].as_i64().unwrap(), 7);
1442        assert_eq!(v["t2"]["custom"].as_str().unwrap(), "keep");
1443    }
1444
1445    #[test]
1446    fn merged_cluster_tier_config_errors_on_non_mapping_root() {
1447        let plugin_dir = temp_dir_unique("pike_test_non_mapping_root");
1448        let config_path = PathBuf::from("picodata.yaml");
1449        let config_file = plugin_dir.join(&config_path);
1450
1451        fs::write(&config_file, "- a\n- b\n- c\n").unwrap();
1452
1453        let mut tiers = BTreeMap::new();
1454        tiers.insert(
1455            "t1".to_string(),
1456            Tier {
1457                replicasets: 1,
1458                replication_factor: 1,
1459            },
1460        );
1461
1462        let err = get_merged_cluster_tier_config(&plugin_dir, &config_path, &tiers).unwrap_err();
1463        let msg = format!("{err:#}");
1464        assert!(
1465            msg.contains("expected YAML mapping"),
1466            "expected error about non-mapping root, got: {msg}"
1467        );
1468    }
1469
1470    #[test]
1471    fn enable_plugins_fails_on_missing_version() {
1472        let topology = Topology {
1473            tiers: BTreeMap::new(),
1474            plugins: {
1475                let mut m = BTreeMap::new();
1476                m.insert(
1477                    "p".to_string(),
1478                    Plugin {
1479                        migration_context: vec![],
1480                        services: BTreeMap::new(),
1481                        version: None,
1482                        path: None,
1483                    },
1484                );
1485                m
1486            },
1487            enviroment: BTreeMap::new(),
1488            pre_install_sql: vec![],
1489        };
1490        let cluster_dir = temp_dir_unique("pike_test_cluster");
1491        let picodata_path = Path::new("picodata");
1492        let err = enable_plugins(&topology, &cluster_dir, picodata_path).unwrap_err();
1493        let msg = format!("{err:#}");
1494        assert!(
1495            msg.contains("plugin version is missing"),
1496            "Expected explicit error on missing version, got: {msg}"
1497        );
1498    }
1499
1500    #[test]
1501    fn merged_cluster_tier_config_errors_on_null_root() {
1502        let plugin_dir = temp_dir_unique("pike_test_null_root");
1503        let config_path = PathBuf::from("picodata.yaml");
1504        let config_file = plugin_dir.join(&config_path);
1505
1506        fs::write(&config_file, "null").unwrap();
1507
1508        let mut tiers = BTreeMap::new();
1509        tiers.insert(
1510            "t1".to_string(),
1511            Tier {
1512                replicasets: 1,
1513                replication_factor: 1,
1514            },
1515        );
1516
1517        let err = get_merged_cluster_tier_config(&plugin_dir, &config_path, &tiers).unwrap_err();
1518        assert!(
1519            format!("{err:#}").contains("expected YAML mapping"),
1520            "expected error about null root, got: {err}"
1521        );
1522    }
1523
1524    #[test]
1525    fn external_plugin_absolute_path_supported() {
1526        // Create a shipping directory structure: plugin_name/version/manifest.yaml
1527        let base = tmp_dir("abs");
1528        let plugin_name = "abs_plugin";
1529        let version = "0.1.0";
1530        let shipping_root = base.join(plugin_name).join(version);
1531        fs::create_dir_all(&shipping_root).unwrap();
1532        fs::write(shipping_root.join("manifest.yaml"), "name: abs_plugin\n").unwrap();
1533
1534        let kind = get_external_plugin_path_kind(&base.join(plugin_name)).unwrap();
1535        assert_eq!(kind, PluginPathKind::ShippingDirectory);
1536    }
1537
1538    #[test]
1539    fn materialize_external_plugin_shipping_directory() {
1540        let base = tmp_dir("mat_dir");
1541        let src = base.join("test_plugin");
1542        let dst = base.join("dst");
1543        fs::create_dir_all(&src).unwrap();
1544        fs::create_dir_all(&dst).unwrap();
1545
1546        fs::write(src.join("manifest.yaml"), "name: test\n").unwrap();
1547
1548        let topology = Topology::default();
1549        let params = ParamsBuilder::default().topology(topology).build().unwrap();
1550
1551        materialize_external_plugin(
1552            "test_plugin",
1553            PluginPathKind::ShippingDirectory,
1554            &src,
1555            &params,
1556            &dst,
1557        )
1558        .unwrap();
1559
1560        assert!(dst.join("test_plugin/manifest.yaml").exists());
1561    }
1562
1563    #[test]
1564    fn materialize_external_plugin_shipping_archive() {
1565        let base = tmp_dir("mat_arc");
1566        fs::create_dir_all(&base).unwrap();
1567        let archive_path = base.join("plugin.tar.gz");
1568        let dst = base.join("dst");
1569        fs::create_dir_all(&dst).unwrap();
1570
1571        // Create dummy tar.gz
1572        {
1573            let file = File::create(&archive_path).unwrap();
1574            let enc = GzEncoder::new(file, Compression::default());
1575            let mut tar = Builder::new(enc);
1576
1577            // Create required structure for is_plugin_archive
1578            // plugin_name / plugin_version / [manifest.yaml | lib...]
1579            let header_path_manifest = Path::new("test_plugin/0.1.0/manifest.yaml");
1580            let mut header = tar::Header::new_gnu();
1581            header.set_size(0);
1582            header.set_cksum();
1583            tar.append_data(&mut header, header_path_manifest, &mut "".as_bytes())
1584                .unwrap();
1585
1586            let lib_name = format!("test_plugin/0.1.0/libtest.{LIB_EXT}");
1587            let header_path_lib = Path::new(&lib_name);
1588            let mut header_lib = tar::Header::new_gnu();
1589            header_lib.set_size(0);
1590            header_lib.set_cksum();
1591            tar.append_data(&mut header_lib, header_path_lib, &mut "".as_bytes())
1592                .unwrap();
1593
1594            tar.finish().unwrap();
1595        }
1596
1597        let topology = Topology::default();
1598        let params = ParamsBuilder::default().topology(topology).build().unwrap();
1599
1600        materialize_external_plugin(
1601            "test_plugin",
1602            PluginPathKind::ShippingArchive,
1603            &archive_path,
1604            &params,
1605            &dst,
1606        )
1607        .unwrap();
1608
1609        let unpacked_manifest = dst.join("test_plugin/0.1.0/manifest.yaml");
1610        assert!(
1611            unpacked_manifest.exists(),
1612            "manifest should be unpacked at {}",
1613            unpacked_manifest.display()
1614        );
1615    }
1616
1617    #[test]
1618    fn materialize_external_plugin_crate_no_build() {
1619        let base = tmp_dir("mat_crate");
1620        let plugin_path = base.join("my_plugin");
1621        // emulate build artifact: target/debug/my_plugin/manifest.yaml
1622        let artifact_path = plugin_path.join("target/debug/my_plugin");
1623        fs::create_dir_all(&artifact_path).unwrap();
1624        fs::write(artifact_path.join("manifest.yaml"), "artifact").unwrap();
1625
1626        let dst = base.join("dst");
1627        fs::create_dir_all(&dst).unwrap();
1628
1629        let topology = Topology::default();
1630        let params = ParamsBuilder::default()
1631            .topology(topology)
1632            .no_build(true)
1633            .target_dir(PathBuf::from("target"))
1634            .build()
1635            .unwrap();
1636
1637        materialize_external_plugin(
1638            "my_plugin",
1639            PluginPathKind::CrateOrWorkspaceDirectory,
1640            &plugin_path,
1641            &params,
1642            &dst,
1643        )
1644        .unwrap();
1645
1646        assert!(dst.join("my_plugin/manifest.yaml").exists());
1647    }
1648    #[test]
1649    fn test_topology_deserialization_with_pre_install_sql() {
1650        let toml_str = r#"
1651        pre_install_sql = [
1652            'CREATE TABLE "t" ("id" INT PRIMARY KEY);',
1653            "ALTER SYSTEM SET param = 'value';",
1654            '''
1655            ALTER SYSTEM SET multiline = 'test';
1656            '''
1657        ]
1658        [tier.default]
1659        replicasets = 1
1660        replication_factor = 1
1661        "#;
1662        let topology: Topology = toml::from_str(toml_str).unwrap();
1663        assert_eq!(topology.pre_install_sql.len(), 3);
1664        assert_eq!(
1665            topology.pre_install_sql[0].trim(),
1666            "CREATE TABLE \"t\" (\"id\" INT PRIMARY KEY);"
1667        );
1668        assert_eq!(
1669            topology.pre_install_sql[1].trim(),
1670            "ALTER SYSTEM SET param = 'value';"
1671        );
1672        assert_eq!(
1673            topology.pre_install_sql[2].trim(),
1674            "ALTER SYSTEM SET multiline = 'test';"
1675        );
1676    }
1677}