Skip to main content

greentic_operator/
providers.rs

1use std::{
2    collections::BTreeSet,
3    path::{Path, PathBuf},
4};
5
6use anyhow::Context;
7use chrono::{DateTime, Utc};
8use serde::Serialize;
9use serde_json::{Value, json};
10
11use crate::bin_resolver::{self, ResolveCtx};
12use crate::config::{DemoConfig, DemoProviderConfig};
13use crate::domains::Domain;
14use crate::operator_log;
15use crate::runner_integration;
16use crate::runtime_state::RuntimePaths;
17use crate::secrets_setup::{SecretsSetup, resolve_env};
18use crate::setup_input::{SetupInputAnswers, collect_setup_answers, load_setup_input};
19use greentic_runner_desktop::{RunResult, RunStatus};
20use tokio::runtime::Builder;
21
22pub struct ProviderSetupOptions {
23    pub providers: Option<Vec<String>>,
24    pub verify_webhooks: bool,
25    pub force_setup: bool,
26    pub skip_setup: bool,
27    pub skip_secrets_init: bool,
28    pub allow_contract_change: bool,
29    pub backup: bool,
30    pub setup_input: Option<PathBuf>,
31    pub runner_binary: Option<PathBuf>,
32    pub continue_on_error: bool,
33}
34
35pub fn run_provider_setup(
36    config_dir: &Path,
37    config: &DemoConfig,
38    public_base_url: Option<&str>,
39    options: ProviderSetupOptions,
40) -> anyhow::Result<()> {
41    let providers = resolve_providers(config, options.providers);
42    if providers.is_empty() || options.skip_setup {
43        return Ok(());
44    }
45
46    let runner = resolve_runner_binary(config_dir, options.runner_binary)?;
47    let env = resolve_env(None);
48    let secrets_setup = if options.skip_secrets_init {
49        None
50    } else {
51        Some(SecretsSetup::new(
52            config_dir,
53            &env,
54            &config.tenant,
55            Some(&config.team),
56        )?)
57    };
58    let secrets_runtime = if secrets_setup.is_some() {
59        Some(
60            Builder::new_current_thread()
61                .enable_all()
62                .build()
63                .context("build secrets runtime")?,
64        )
65    } else {
66        None
67    };
68
69    let runtime = RuntimePaths::new(
70        config_dir.join("state"),
71        config.tenant.clone(),
72        config.team.clone(),
73    );
74    let providers_root = runtime.runtime_root().join("providers");
75    std::fs::create_dir_all(&providers_root)?;
76    let provider_keys: BTreeSet<String> = providers.iter().map(|(name, _)| name.clone()).collect();
77    let setup_input_answers = if let Some(path) = options.setup_input.as_ref() {
78        Some(SetupInputAnswers::new(
79            load_setup_input(path)?,
80            provider_keys.clone(),
81        )?)
82    } else {
83        None
84    };
85
86    for (provider, cfg) in providers {
87        let result = (|| -> anyhow::Result<()> {
88            let pack_path = resolve_pack_path(config_dir, &provider, &cfg)?;
89            if let (Some(setup), Some(runtime)) = (secrets_setup.as_ref(), secrets_runtime.as_ref())
90            {
91                runtime
92                    .block_on(async { setup.ensure_pack_secrets(&pack_path, &provider).await })
93                    .with_context(|| format!("seed secrets for provider {provider}"))?;
94            }
95
96            let setup_path = providers_root.join(format!("{provider}.setup.json"));
97            if setup_path.exists() && !options.force_setup {
98                return Ok(());
99            }
100
101            let setup_flow = cfg
102                .setup_flow
103                .clone()
104                .unwrap_or_else(|| "setup_default".to_string());
105            let answers = collect_setup_answers(
106                &pack_path,
107                &provider,
108                setup_input_answers.as_ref(),
109                setup_input_answers.is_none(),
110            )?;
111            let mode = Some(crate::component_qa_ops::QaMode::Setup);
112            let qa_config_override = if let Some(mode) = mode {
113                if let Err(err) = crate::component_qa_ops::persist_answers_artifacts(
114                    &providers_root,
115                    &provider,
116                    mode,
117                    &answers,
118                ) {
119                    operator_log::warn(
120                        module_path!(),
121                        format!(
122                            "failed to persist qa answers provider={} mode={} flow={}: {err}",
123                            provider,
124                            mode.as_str(),
125                            setup_flow
126                        ),
127                    );
128                }
129                let current_config =
130                    crate::provider_config_envelope::read_provider_config_envelope(
131                        &providers_root,
132                        &provider,
133                    )?
134                    .map(|envelope| envelope.config);
135                crate::provider_config_envelope::ensure_contract_compatible(
136                    &providers_root,
137                    &provider,
138                    &setup_flow,
139                    &pack_path,
140                    options.allow_contract_change,
141                )?;
142                match crate::component_qa_ops::apply_answers_via_component_qa(
143                    config_dir,
144                    Domain::Messaging,
145                    &config.tenant,
146                    Some(&config.team),
147                    &crate::domains::ProviderPack {
148                        pack_id: provider.clone(),
149                        file_name: pack_path
150                            .file_name()
151                            .and_then(|value| value.to_str())
152                            .unwrap_or_default()
153                            .to_string(),
154                        path: pack_path.clone(),
155                        entry_flows: Vec::new(),
156                    },
157                    &provider,
158                    mode,
159                    current_config.as_ref(),
160                    &answers,
161                ) {
162                    Ok(value) => value,
163                    Err(diag) => {
164                        operator_log::error(
165                            module_path!(),
166                            format!(
167                                "component qa failed provider={} flow={} code={} message={}",
168                                provider,
169                                setup_flow,
170                                diag.code.as_str(),
171                                diag.message
172                            ),
173                        );
174                        return Err(anyhow::anyhow!("{diag}"));
175                    }
176                }
177            } else {
178                None
179            };
180            let input = build_input(
181                &provider,
182                &config.tenant,
183                &config.team,
184                public_base_url,
185                Some(&answers),
186            )?;
187            let mut input = input;
188            if let Some(config_value) = qa_config_override.as_ref() {
189                input["config"] = config_value.clone();
190            }
191            if let Some(config_value) = qa_config_override.as_ref() {
192                write_qa_setup_success_record(
193                    &setup_path,
194                    &provider,
195                    &setup_flow,
196                    Some(config_value),
197                )?;
198                if let Err(err) = crate::provider_config_envelope::write_provider_config_envelope(
199                    &providers_root,
200                    &provider,
201                    &setup_flow,
202                    config_value,
203                    &pack_path,
204                    options.backup,
205                ) {
206                    operator_log::warn(
207                        module_path!(),
208                        format!(
209                            "failed to write provider config envelope provider={} flow={}: {err}",
210                            provider, setup_flow
211                        ),
212                    );
213                }
214                if options.verify_webhooks {
215                    let verify_flow = cfg
216                        .verify_flow
217                        .clone()
218                        .unwrap_or_else(|| "verify_webhooks".to_string());
219                    let verify_path = providers_root.join(format!("{provider}.verify.json"));
220                    if !verify_path.exists() || options.force_setup {
221                        let output = runner_integration::run_flow(
222                            &runner,
223                            &pack_path,
224                            &verify_flow,
225                            &input,
226                        )?;
227                        write_run_output(&verify_path, &provider, &verify_flow, &output)?;
228                    }
229                }
230                // Auto-register webhooks with external APIs (Telegram, Slack, Webex)
231                // Merge raw answers into config so webhook-specific fields
232                // (e.g. slack_configuration_token, slack_app_id) are available.
233                {
234                    let mut wh_config = qa_config_override.clone().unwrap_or(json!({}));
235                    if let Some(obj) = answers.as_object() {
236                        for (k, v) in obj {
237                            if wh_config.get(k).is_none() {
238                                wh_config[k] = v.clone();
239                            }
240                        }
241                    }
242                    try_webhook_setup(
243                        config_dir,
244                        &provider,
245                        &config.tenant,
246                        Some(&config.team),
247                        &wh_config,
248                    );
249                }
250                let status_path = providers_root.join(format!("{provider}.status.json"));
251                write_status(&status_path, &provider, &setup_path)?;
252                return Ok(());
253            }
254            let output = runner_integration::run_flow(&runner, &pack_path, &setup_flow, &input)?;
255            write_run_output(&setup_path, &provider, &setup_flow, &output)?;
256            if let Some(config_value) = qa_config_override
257                .or_else(|| extract_config_for_envelope(output.parsed.as_ref(), &input))
258                && let Err(err) = crate::provider_config_envelope::write_provider_config_envelope(
259                    &providers_root,
260                    &provider,
261                    &setup_flow,
262                    &config_value,
263                    &pack_path,
264                    options.backup,
265                )
266            {
267                operator_log::warn(
268                    module_path!(),
269                    format!(
270                        "failed to write provider config envelope provider={} flow={}: {err}",
271                        provider, setup_flow
272                    ),
273                );
274            }
275
276            if options.verify_webhooks {
277                let verify_flow = cfg
278                    .verify_flow
279                    .clone()
280                    .unwrap_or_else(|| "verify_webhooks".to_string());
281                let verify_path = providers_root.join(format!("{provider}.verify.json"));
282                if !verify_path.exists() || options.force_setup {
283                    let output =
284                        runner_integration::run_flow(&runner, &pack_path, &verify_flow, &input)?;
285                    write_run_output(&verify_path, &provider, &verify_flow, &output)?;
286                }
287            }
288
289            // Auto-register webhooks with external APIs (Telegram, Slack, Webex)
290            // Merge raw answers so webhook-specific fields are available.
291            {
292                let mut wh_config = input["config"].clone();
293                if let Some(obj) = answers.as_object() {
294                    for (k, v) in obj {
295                        if wh_config.get(k).is_none() {
296                            wh_config[k] = v.clone();
297                        }
298                    }
299                }
300                try_webhook_setup(
301                    config_dir,
302                    &provider,
303                    &config.tenant,
304                    Some(&config.team),
305                    &wh_config,
306                );
307            }
308            let status_path = providers_root.join(format!("{provider}.status.json"));
309            write_status(&status_path, &provider, &setup_path)?;
310            Ok(())
311        })();
312        if let Err(err) = result {
313            if options.continue_on_error {
314                operator_log::error(
315                    module_path!(),
316                    format!("provider setup failed for {provider}: {err}"),
317                );
318                continue;
319            }
320            return Err(err);
321        }
322    }
323
324    Ok(())
325}
326
327pub(crate) fn write_qa_setup_success_record(
328    path: &Path,
329    provider: &str,
330    flow: &str,
331    config: Option<&Value>,
332) -> anyhow::Result<()> {
333    let record = ProviderRunRecord {
334        provider: provider.to_string(),
335        flow: flow.to_string(),
336        status: "Success".to_string(),
337        success: true,
338        stdout: String::new(),
339        stderr: String::new(),
340        parsed: Some(json!({
341            "status": "Success",
342            "flow_id": flow,
343            "pack_id": provider,
344            "mode": "component-qa",
345            "config": config.cloned().unwrap_or(Value::Null)
346        })),
347        timestamp: Utc::now(),
348    };
349    let bytes = serde_json::to_vec_pretty(&record)?;
350    if let Some(parent) = path.parent() {
351        std::fs::create_dir_all(parent)?;
352    }
353    std::fs::write(path, bytes)?;
354    Ok(())
355}
356
357/// Try to register webhooks with external APIs after provider setup.
358/// This is a best-effort operation — failures are logged but do not block setup.
359fn try_webhook_setup(
360    config_dir: &Path,
361    provider: &str,
362    tenant: &str,
363    team: Option<&str>,
364    config: &Value,
365) {
366    // Resolve pack path for webhook_setup API
367    let pack_path = config_dir.join("packs").join(format!("{provider}.gtpack"));
368    let pack = crate::domains::ProviderPack {
369        pack_id: provider.to_string(),
370        file_name: pack_path
371            .file_name()
372            .and_then(|v| v.to_str())
373            .unwrap_or_default()
374            .to_string(),
375        path: pack_path,
376        entry_flows: Vec::new(),
377    };
378    match crate::onboard::webhook_setup::try_provider_setup_webhook(
379        config_dir,
380        Domain::Messaging,
381        &pack,
382        provider,
383        tenant,
384        team,
385        config,
386    ) {
387        Some(result) => {
388            let ok = result.get("ok").and_then(Value::as_bool).unwrap_or(false);
389            if ok {
390                operator_log::info(
391                    module_path!(),
392                    format!(
393                        "[providers] webhook auto-setup ok provider={} result={}",
394                        provider, result
395                    ),
396                );
397            } else {
398                operator_log::warn(
399                    module_path!(),
400                    format!(
401                        "[providers] webhook auto-setup failed provider={} result={}",
402                        provider, result
403                    ),
404                );
405            }
406        }
407        None => {
408            operator_log::info(
409                module_path!(),
410                format!(
411                    "[providers] webhook auto-setup skipped provider={} (no public_base_url or unsupported)",
412                    provider
413                ),
414            );
415        }
416    }
417}
418
419fn resolve_providers(
420    config: &DemoConfig,
421    filter: Option<Vec<String>>,
422) -> Vec<(String, DemoProviderConfig)> {
423    let mut selected = Vec::new();
424    let Some(map) = config.providers.as_ref() else {
425        return selected;
426    };
427    let filter_set = filter.map(|list| {
428        list.into_iter()
429            .map(|value| value.trim().to_string())
430            .collect::<std::collections::BTreeSet<_>>()
431    });
432    for (name, cfg) in map {
433        if let Some(filter_set) = filter_set.as_ref()
434            && !filter_set.contains(name)
435        {
436            continue;
437        }
438        selected.push((name.clone(), cfg.clone()));
439    }
440    selected
441}
442
443fn resolve_runner_binary(config_dir: &Path, explicit: Option<PathBuf>) -> anyhow::Result<PathBuf> {
444    let explicit = explicit.map(|path| {
445        if path.is_absolute() {
446            path
447        } else {
448            config_dir.join(path)
449        }
450    });
451    bin_resolver::resolve_binary(
452        "greentic-runner",
453        &ResolveCtx {
454            config_dir: config_dir.to_path_buf(),
455            explicit_path: explicit,
456        },
457    )
458}
459
460fn resolve_pack_path(
461    config_dir: &Path,
462    provider: &str,
463    cfg: &DemoProviderConfig,
464) -> anyhow::Result<PathBuf> {
465    if let Some(pack) = cfg.pack.as_ref() {
466        let path = Path::new(pack);
467        return Ok(if path.is_absolute() {
468            path.to_path_buf()
469        } else {
470            config_dir.join(path)
471        });
472    }
473    let default_dir = if config_dir.join("provider-packs").exists() {
474        config_dir.join("provider-packs")
475    } else {
476        config_dir.join("demo").join("provider-packs")
477    };
478    Ok(default_dir.join(format!("{provider}.gtpack")))
479}
480
481fn build_input(
482    pack_id: &str,
483    tenant: &str,
484    team: &str,
485    public_base_url: Option<&str>,
486    answers: Option<&Value>,
487) -> anyhow::Result<Value> {
488    let mut payload = serde_json::json!({
489        "id": pack_id,
490        "tenant": tenant,
491        "team": team,
492        "env": "dev",
493    });
494    let mut config = serde_json::json!({});
495    // Use runtime public_base_url, fall back to answers if not available
496    let effective_url = public_base_url.map(|u| u.to_string()).or_else(|| {
497        answers
498            .and_then(|a| a.get("public_base_url"))
499            .and_then(Value::as_str)
500            .filter(|s| !s.is_empty())
501            .map(|s| s.to_string())
502    });
503    if let Some(ref url) = effective_url {
504        payload["public_base_url"] = Value::String(url.clone());
505        config["public_base_url"] = Value::String(url.clone());
506    }
507    config["id"] = Value::String(pack_id.to_string());
508    payload["config"] = config;
509    payload["msg"] = serde_json::json!({
510        "channel": "setup",
511        "id": format!("{pack_id}.setup"),
512        "message": {
513            "id": format!("{pack_id}.setup_default__collect"),
514            "text": "Collect inputs for setup_default."
515        },
516        "metadata": serde_json::json!({}),
517        "reply_scope": "",
518        "session_id": "setup",
519        "tenant_id": tenant,
520        "text": "Collect inputs for setup_default.",
521        "user_id": "operator"
522    });
523    payload["payload"] = serde_json::json!({
524        "id": format!("{pack_id}-setup_default"),
525        "spec_ref": "assets/setup.yaml"
526    });
527    if let Some(answers) = answers {
528        payload["setup_answers"] = answers.clone();
529        payload["answers_json"] = Value::String(serde_json::to_string(answers)?);
530    }
531    Ok(payload)
532}
533
534fn extract_config_for_envelope(parsed: Option<&Value>, input: &Value) -> Option<Value> {
535    if let Some(parsed) = parsed {
536        if let Some(config) = parsed.get("config") {
537            return Some(config.clone());
538        }
539        return Some(parsed.clone());
540    }
541    input.get("config").cloned()
542}
543
544pub(crate) fn write_run_output(
545    path: &Path,
546    provider: &str,
547    flow: &str,
548    output: &runner_integration::RunnerOutput,
549) -> anyhow::Result<()> {
550    let record = ProviderRunRecord {
551        provider: provider.to_string(),
552        flow: flow.to_string(),
553        status: output
554            .status
555            .code()
556            .map(|code| code.to_string())
557            .unwrap_or_else(|| "terminated".to_string()),
558        success: output.status.success(),
559        stdout: output.stdout.clone(),
560        stderr: output.stderr.clone(),
561        parsed: output.parsed.clone(),
562        timestamp: Utc::now(),
563    };
564    let bytes = serde_json::to_vec_pretty(&record)?;
565    if let Some(parent) = path.parent() {
566        std::fs::create_dir_all(parent)?;
567    }
568    std::fs::write(path, bytes)?;
569    Ok(())
570}
571
572pub(crate) fn write_run_result(
573    path: &Path,
574    provider: &str,
575    flow: &str,
576    result: &RunResult,
577) -> anyhow::Result<()> {
578    let parsed = serde_json::to_value(result).ok();
579    let record = ProviderRunRecord {
580        provider: provider.to_string(),
581        flow: flow.to_string(),
582        status: format!("{:?}", result.status),
583        success: result.status == RunStatus::Success,
584        stdout: String::new(),
585        stderr: result.error.clone().unwrap_or_default(),
586        parsed,
587        timestamp: Utc::now(),
588    };
589    let bytes = serde_json::to_vec_pretty(&record)?;
590    if let Some(parent) = path.parent() {
591        std::fs::create_dir_all(parent)?;
592    }
593    std::fs::write(path, bytes)?;
594    Ok(())
595}
596
597fn write_status(path: &Path, provider: &str, setup_path: &Path) -> anyhow::Result<()> {
598    let status = ProviderStatus {
599        provider: provider.to_string(),
600        setup_path: setup_path.to_path_buf(),
601        updated_at: Utc::now(),
602    };
603    let bytes = serde_json::to_vec_pretty(&status)?;
604    if let Some(parent) = path.parent() {
605        std::fs::create_dir_all(parent)?;
606    }
607    std::fs::write(path, bytes)?;
608    Ok(())
609}
610
611#[derive(Serialize)]
612struct ProviderRunRecord {
613    provider: String,
614    flow: String,
615    status: String,
616    success: bool,
617    stdout: String,
618    stderr: String,
619    parsed: Option<Value>,
620    timestamp: DateTime<Utc>,
621}
622
623#[derive(Serialize)]
624struct ProviderStatus {
625    provider: String,
626    setup_path: PathBuf,
627    updated_at: DateTime<Utc>,
628}