use anyhow::Result;
use std::fs;
use super::cli::{AzureCli, EventHubInputConfig, ParquetOutputConfig};
use super::context::DeployContext;
const BICEP_TEMPLATE: &str = include_str!("../../../../templates/azure/otlp.bicep");
const STREAM_ANALYTICS_QUERY: &str = r#"
-- Route logs by signal_type
SELECT
*
INTO
[logsoutput]
FROM
[eventhubinput]
WHERE
signal_type = 'logs'
-- Route traces by signal_type
SELECT
*
INTO
[tracesoutput]
FROM
[eventhubinput]
WHERE
signal_type = 'traces'
-- Route gauge metrics by signal_type
SELECT
*
INTO
[gaugeoutput]
FROM
[eventhubinput]
WHERE
signal_type = 'metrics_gauge'
-- Route sum metrics by signal_type
SELECT
*
INTO
[sumoutput]
FROM
[eventhubinput]
WHERE
signal_type = 'metrics_sum'
"#;
pub fn deploy_bicep_template(
cli: &AzureCli,
ctx: &DeployContext,
container_image: &str,
) -> Result<()> {
eprintln!("\n==> Phase 1: Deploying Bicep template");
if !cli.resource().group_exists(&ctx.resource_group)? {
eprintln!(" Creating resource group: {}", ctx.resource_group);
cli.resource().create_group(&ctx.resource_group)?;
} else {
eprintln!(" Resource group exists: {}", ctx.resource_group);
}
eprintln!(" Deploying storage account and Event Hub...");
let temp_dir = std::env::temp_dir();
let template_path = temp_dir.join(format!("otlp-azure-{}.bicep", ctx.env_name));
fs::write(&template_path, BICEP_TEMPLATE)?;
let template_path_str = template_path.to_str().ok_or_else(|| {
anyhow::anyhow!(
"Temporary file path contains invalid UTF-8: {:?}. \
This may indicate a system configuration issue.",
template_path
)
})?;
let result = cli.resource().deploy_bicep(
&ctx.resource_group,
template_path_str,
&[
("location", &ctx.region),
("envName", &ctx.env_name),
("storageAccountName", &ctx.storage_account),
("eventHubNamespace", &ctx.eventhub_namespace),
("containerImage", container_image),
],
);
if let Err(e) = fs::remove_file(&template_path) {
eprintln!(
" Warning: Failed to clean up temporary Bicep template at {}: {}",
template_path.display(),
e
);
eprintln!(
" This file contains infrastructure configuration. Consider removing it manually."
);
}
result?;
eprintln!(" ✓ Bicep deployment complete");
Ok(())
}
pub fn create_stream_analytics_job(cli: &AzureCli, ctx: &DeployContext) -> Result<()> {
eprintln!("\n==> Phase 2: Creating Stream Analytics job");
let sa = cli.stream_analytics();
if !sa.job_exists(&ctx.stream_analytics_job, &ctx.resource_group)? {
eprintln!(" Creating job: {}", ctx.stream_analytics_job);
sa.create_job(&ctx.stream_analytics_job, &ctx.resource_group)?;
} else {
eprintln!(" Job exists: {}", ctx.stream_analytics_job);
}
eprintln!(" Retrieving connection strings...");
let eventhub_conn = cli
.eventhub()
.get_connection_string(&ctx.eventhub_namespace, &ctx.resource_group)?;
let storage_conn = cli
.storage()
.get_connection_string(&ctx.storage_account, &ctx.resource_group)?;
eprintln!(" Configuring Event Hub input...");
let input_config = EventHubInputConfig::new(
ctx.eventhub_namespace.clone(),
ctx.eventhub_name.clone(),
eventhub_conn,
)?;
sa.create_input(
&ctx.stream_analytics_job,
&ctx.resource_group,
&input_config,
)?;
eprintln!(" Configuring Parquet outputs...");
let output_names = vec![
("logs", "logs"),
("traces", "traces"),
("gauge", "metrics-gauge"),
("sum", "metrics-sum"),
];
for (name, container) in output_names {
eprintln!(" Creating output: {} → {}/", name, container);
let output_config = ParquetOutputConfig::new(
format!("{}output", name),
ctx.storage_account.clone(),
container.to_string(),
storage_conn.clone(),
)?;
sa.create_output(
&ctx.stream_analytics_job,
&ctx.resource_group,
&output_config,
)?;
}
eprintln!(" Setting Stream Analytics query...");
sa.set_query(
&ctx.stream_analytics_job,
&ctx.resource_group,
STREAM_ANALYTICS_QUERY,
)?;
eprintln!(" ✓ Stream Analytics job configured");
Ok(())
}
pub fn start_stream_analytics_job(cli: &AzureCli, ctx: &DeployContext) -> Result<()> {
eprintln!("\n==> Phase 3: Starting Stream Analytics job");
cli.stream_analytics()
.start_job(&ctx.stream_analytics_job, &ctx.resource_group)?;
eprintln!(" ✓ Job started");
Ok(())
}