tag2upload_service_manager/
cli_cli.rs

1//! Real command-line program startup
2#![cfg(not(test))]
3
4use crate::prelude::*;
5
6use clap::Parser;
7
8// Actually, tag2upload_service_manager::test:: don't appear in the
9// list of unexecuted code locations anyway.  That's because that
10// whole module is #[cfg(test)] so doesn't end up in the executable.
11//
12// But we want this to filter out *executed* transactions, which might
13// have been run by tests.  And to filter out hypothetical bsql!s in
14// out-of-workspace dependencies.
15const BSQL_COVERAGE_WANTED: &str = r"
16    tag2upload_service_manager::
17    !tag2upload_service_manager::test::
18";
19
20fn bsql_coverage_wanted() -> bsql_coverage::ModulePathSelector {
21    BSQL_COVERAGE_WANTED.parse().expect("bad patterns in source code")
22}
23
24/// Task that will send a systemd-style startup notification when we're up
25///
26/// start-stop-daemon can perform this protocol too,
27/// and that makes for a more convenient way to daemonise
28/// than messing about with double fork in Rust.
29pub async fn sdnotify_task(started: oneshot::Receiver<Result<(), String>>) {
30    use sd_notify::NotifyState as NS;
31    let notify = |l| {
32        let _: io::Result<()> = sd_notify::notify(true, l);
33    };
34
35    match async {
36        let started = started.await
37            .map_err(|_| format!("crashed during startup"))?
38            .map_err(|e: String| format!("startup failed: {e}"))?;
39        globals().await_running().await
40            .map_err(|ShuttingDown| format!("shut down during startup?!"))?;
41        Ok::<_, String>(started)
42    }.await {
43        Err(e) => notify(&[
44            NS::Status(&e),
45            NS::Errno(libc::EINVAL as _),
46        ]),
47        Ok(()) => notify(&[
48            NS::Ready,
49        ]),
50    };
51}
52
53pub fn t_print_bsql_query_plans(
54    record: &mut dyn io::Read,
55    temp_dir: &str,
56) -> anyhow::Result<()> {
57    let mut used = BTreeMap::<_, BTreeSet<_>>::new();
58    let wanted = bsql_coverage_wanted();
59
60    for et in bsql_coverage::read_executed_texts(record)? {
61        // Don't print the query plans for queries that had any bsql!
62        // that was in a test module - those aren't production queries.
63        if ! et.locs.iter().all(|loc| wanted.wants(loc).is_some()) {
64            continue;
65        }
66
67        used.entry(et.sql_text).or_default().insert(
68            et.locs.into_iter().collect::<BTreeSet<_>>()
69        );
70    };
71
72    let temp_db_file = format!("{}/empty-for-explain.db", temp_dir);
73
74    let sqlite_output = |qtxt: &_| {
75        utils::t_run_sqlite_batch_qtxt(&temp_db_file, qtxt)
76    };
77
78    sqlite_output(&db_schema::schema())?;
79
80    let buffer = used.iter().map(|(q, locs_locs)| {
81        let plan = sqlite_output(&format!("EXPLAIN QUERY PLAN {q}"))?;
82        anyhow::Result::Ok((q, locs_locs, plan))
83    }).collect::<anyhow::Result<Vec<_>>>()?;
84
85    let mut delim =
86        "==================== sql query texts run ====================";
87    for (q, locs_locs, plan) in buffer {
88        println!("{delim}");
89        for locs in locs_locs {
90            for loc in locs {
91                println!("-- {loc}");
92            }
93            if locs.is_empty() {
94                println!("-- (untracked)");
95            }
96            println!("----");
97        }
98        println!("\n{q}");
99
100        println!("----------");
101        print!("{plan}");
102
103        delim = "\n========================================"
104    }
105    println!("\n========================================");
106
107    Ok(())
108}
109
110/// Main entrypoint, called from actual `main` in `main.rs`
111#[cfg(not(test))]
112#[rocket::main]
113pub async fn main() -> Result<(), AE> {
114    use CliOperation as Op;
115
116    let mut cli_options = CliOptions::parse();
117
118    struct ShouldContinue; // detect omission of `return Ok(())`
119    let ShouldContinue = match &cli_options.op {
120        Op::PrintDatabaseSchema {} => {
121            print!("{}", db_schema::schema());
122            return Ok(());
123        }
124        Op::PrintDatabaseSchemaVersion {} => {
125            println!("{}", serde_json::to_string_pretty(&json! { {
126                "intended": db_schema::SCHEMA_VERSION,
127            }}).expect("format json failed"));
128            return Ok(());
129        }
130        Op::SchemaMigrationAdhoc {
131            commit,
132            db_file,
133            from_version,
134            to_version,
135            temp_dir,
136        } => {
137            (|| {
138                let mut progress = io::stdout().lock();
139                let conn = rusqlite::Connection::open(&db_file)
140                    .with_db_context(|| db_file.clone())
141                    .db_context("open db_file")?;
142                db_migration::migration_core(
143                    conn,
144                    temp_dir.as_ref(),
145                    &mut progress,
146                    *from_version,
147                    *to_version,
148                    &db_schema::migration_data(),
149                    db_migration::OnComplete::from_commit_bool(*commit),
150                )
151            })()
152                .map_err(|e| {
153                    // We haven't actually done any retries!  This
154                    // possible failure is documented in the usage and
155                    // is one reason this subcommand is not for use in
156                    // production.
157                    e.unwrap_after_retries().context("failure")
158                })?;
159            return Ok(());
160        }
161        Op::BsqlCheckCoverage {} => {
162            match bsql_coverage::read_calculate_unexecuted_locs(
163                &mut io::stdin(),
164                &bsql_coverage_wanted(),
165            )? {
166                Ok(()) => {
167                    return Ok(())
168                },
169                Err(e) => {
170                    eprintln!("** untested bsql! calls: **");
171                    print!("{e}");
172                    eprintln!("** untested bsql! calls **");
173                    std::process::exit(1);
174                }
175            }
176        }
177        Op::BsqlPrintQueryPlans {} => {
178            let temp_dir = tempfile::TempDir::new()
179                .context("create temp dir")?;
180            t_print_bsql_query_plans(
181                &mut io::stdin(),
182                &temp_dir.path().to_str()
183                    .ok_or_else(|| anyhow!("non-UTF-8 temp dir"))?,
184            )?;
185            drop(temp_dir);
186            return Ok(())
187        }
188        Op::BsqlPrintSourceLocs {} => {
189            for i in bsql_coverage::BSQL_CONSTRUCTION_SITES {
190                println!("{i}");
191            }
192            return Ok(());
193        }
194        _ => ShouldContinue,
195    };
196
197    if cli_options.config.is_empty() {
198        cli_options.config.push(format!(
199            "{}/.t2usm.toml",
200            std::env::var("HOME").context("need $HOME (or -c option)")?
201        ));
202    }        
203
204    let whole_config = global::resolve_config(
205        cli_options,
206        Figment::new(),
207    )?;
208
209    match &whole_config.unchecked.cli_options.op {
210        Op::CheckConfigSyntax {} => {
211            println!("config syntax OK.");
212            return Ok(());
213        }
214        _ => {}
215    }
216
217    let whole_config = whole_config.check()?;
218
219    match whole_config.cli_options.op.clone() {
220        Op::RunManager {} => run_manager(whole_config).await,
221        Op::CheckConfig {} => { println!("config OK."); Ok(()) }
222        Op::PreInstallCheck {} => {
223            println!("config OK.");
224
225            let mut migr_progress = vec![];
226            let temp_dir = tempfile::TempDir::new()
227                .context("create temp dir")?;
228            let outcome = db_support::schema_prepare_idempotent(
229                &whole_config.config,
230                &whole_config.computed_config,
231                temp_dir.path(),
232                &mut migr_progress,
233                db_migration::OnComplete::Rollback,
234            )
235                .inspect_err(|_e| {
236                    println!("** schema migration log**\n{}",
237                             String::from_utf8_lossy(&migr_progress));
238                    println!("** schema migration log ends **");
239                })?;
240            drop(temp_dir);
241            println!("schema OK {outcome}.");
242
243            Ok(())
244        },
245        Op::BsqlPrintSourceLocs {} |
246        Op::BsqlPrintQueryPlans {} |
247        Op::BsqlCheckCoverage {} |
248        Op::CheckConfigSyntax {} |
249        Op::PrintDatabaseSchemaVersion {} |
250        Op::PrintDatabaseSchema {} |
251        Op::SchemaMigrationAdhoc { .. } => panic!("not handled earlier?!"),
252    }
253}
254
255#[cfg(not(test))]
256pub async fn run_manager(whole_config: WholeConfig) -> Result<(), AE> {
257    let (sdnotify_started_tx, sdnotify_started_rx) = oneshot::channel();
258    // We use tokio::spawn since we don't want this task to get involved
259    // with our task tracking.  It is supposed to terminate, and isn't
260    // likely to fail.
261    tokio::spawn(sdnotify_task(sdnotify_started_rx));
262
263    let started = global::startup(
264        whole_config,
265        Default::default(),
266        Default::default(),
267        |rocket| rocket,
268    )
269        .await;
270
271    sdnotify_started_tx.send(
272        started.as_ref()
273            .map(|_: &global::Started| ())
274            .map_err(|e: &StartupError| e.to_string())
275    ).expect("notify task exited too soon!");
276
277    started?
278        .rocket
279        .launch().await?;
280
281    Ok(())
282}