use anyhow::{Context, Result};
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use reqwest::Client;
use crate::auth;
use crate::client;
use super::cells::cell_source_string;
const LIVY_API_VERSION: &str = "2023-12-01";
pub async fn run_exec(
http: &Client,
reference: &str,
code: Option<&str>,
cell_index: Option<usize>,
lakehouse: Option<&str>,
) -> Result<()> {
let (ws_name, nb_name) = client::parse_ref(reference)?;
let ws_id = client::resolve_workspace(http, ws_name).await?;
let nb = client::resolve_item(http, &ws_id, nb_name, "Notebook").await?;
let exec_code = match (code, cell_index) {
(Some(c), _) => c.to_string(),
(None, Some(idx)) => {
let def = client::get_definition(http, &ws_id, &nb.id, "ipynb").await?;
let parts = def.definition.context("No definition returned")?.parts;
let part = parts
.iter()
.find(|p| p.path.contains("notebook-content") || p.path.ends_with(".ipynb"))
.or_else(|| parts.first())
.context("No parts in definition")?;
let decoded = BASE64
.decode(&part.payload)
.context("Failed to decode base64 payload")?;
let ipynb: serde_json::Value =
serde_json::from_slice(&decoded).context("Failed to parse ipynb JSON")?;
let cells = ipynb
.get("cells")
.and_then(|c| c.as_array())
.context("No cells array")?;
let cell = cells
.get(idx)
.context(format!("Cell index {} out of range", idx))?;
cell_source_string(cell)
}
(None, None) => anyhow::bail!("Provide --code or a cell index"),
};
let lh_id = match lakehouse {
Some(lh_name) => {
let lh = client::resolve_item(http, &ws_id, lh_name, "Lakehouse").await?;
lh.id
}
None => {
detect_lakehouse(http, &ws_id, &nb.id).await
.context("No lakehouse attached to notebook. Use --lakehouse <name> to specify one.")?
}
};
let kernel = detect_kernel(http, &ws_id, &nb.id).await.unwrap_or_else(|_| "jupyter".to_string());
let livy_kind = if kernel == "synapse_pyspark" { "pyspark" } else { "python" };
eprintln!(" Creating Livy session (kind={})...", livy_kind);
let token = auth::get_fabric_token()?;
let session_url = format!(
"https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions",
ws_id, lh_id, LIVY_API_VERSION
);
let session_body = serde_json::json!({
"kind": livy_kind,
"name": format!("nb-cli-{}", nb.display_name)
});
let resp = http
.post(&session_url)
.bearer_auth(&token)
.json(&session_body)
.send()
.await
.context("Failed to create Livy session")?;
let status = resp.status();
if !status.is_success() && status.as_u16() != 201 {
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("Failed to create Livy session ({}): {}", status, body);
}
let session: serde_json::Value = resp.json().await?;
let session_id = session
.get("id")
.map(|v| match v {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Number(n) => n.to_string(),
_ => String::new(),
})
.filter(|s| !s.is_empty())
.context("No session ID returned")?;
eprintln!(" Session {} created; waiting for idle state...", session_id);
wait_for_session_idle(http, &ws_id, &lh_id, &session_id).await?;
eprintln!(" Submitting code...");
let stmt_url = format!(
"https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions/{}/statements",
ws_id, lh_id, LIVY_API_VERSION, session_id
);
let stmt_body = serde_json::json!({
"code": exec_code,
"kind": livy_kind
});
let token = auth::get_fabric_token()?;
let stmt_resp = http
.post(&stmt_url)
.bearer_auth(&token)
.json(&stmt_body)
.send()
.await
.context("Failed to submit statement")?;
let stmt: serde_json::Value = stmt_resp.json().await?;
let stmt_id = stmt
.get("id")
.and_then(|v| v.as_u64())
.context("No statement ID returned")?;
let result = poll_statement(http, &ws_id, &lh_id, &session_id, stmt_id).await?;
let output_status = result
.pointer("/output/status")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
if output_status == "ok" {
if let Some(text) = result.pointer("/output/data/text~1plain").and_then(|v| v.as_str()) {
println!("{}", text);
}
} else {
let ename = result.pointer("/output/ename").and_then(|v| v.as_str()).unwrap_or("");
let evalue = result.pointer("/output/evalue").and_then(|v| v.as_str()).unwrap_or("");
eprintln!(" Error: {} {}", ename, evalue);
if let Some(traceback) = result.pointer("/output/traceback").and_then(|v| v.as_array()) {
for line in traceback {
if let Some(s) = line.as_str() {
eprintln!(" {}", s);
}
}
}
}
let _ = cleanup_session(http, &ws_id, &lh_id, &session_id).await;
Ok(())
}
async fn cleanup_session(
http: &Client,
ws_id: &str,
lh_id: &str,
session_id: &str,
) -> Result<()> {
let token = auth::get_fabric_token()?;
let url = format!(
"https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions/{}",
ws_id, lh_id, LIVY_API_VERSION, session_id
);
let _ = http.delete(&url).bearer_auth(&token).send().await;
eprintln!(" Session cleaned up");
Ok(())
}
async fn wait_for_session_idle(
http: &Client,
ws_id: &str,
lh_id: &str,
session_id: &str,
) -> Result<()> {
let check_url = format!(
"https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions/{}",
ws_id, lh_id, LIVY_API_VERSION, session_id
);
for i in 0..120 {
let token = auth::get_fabric_token()?;
let resp = http
.get(&check_url)
.bearer_auth(&token)
.send()
.await?;
let session: serde_json::Value = resp.json().await?;
let state = session.get("state").and_then(|v| v.as_str()).unwrap_or("unknown");
match state {
"idle" => return Ok(()),
"starting" | "not_started" => {
if i % 6 == 0 && i > 0 {
eprintln!(" Still starting... ({}s)", i * 5);
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
"dead" | "error" | "killed" => {
anyhow::bail!("Session entered {} state", state);
}
_ => {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
anyhow::bail!("Session did not become idle within 10 minutes")
}
async fn poll_statement(
http: &Client,
ws_id: &str,
lh_id: &str,
session_id: &str,
stmt_id: u64,
) -> Result<serde_json::Value> {
let stmt_url = format!(
"https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions/{}/statements/{}",
ws_id, lh_id, LIVY_API_VERSION, session_id, stmt_id
);
for _ in 0..120 {
let token = auth::get_fabric_token()?;
let resp = http
.get(&stmt_url)
.bearer_auth(&token)
.send()
.await?;
let result: serde_json::Value = resp.json().await?;
let state = result.get("state").and_then(|v| v.as_str()).unwrap_or("unknown");
match state {
"available" => return Ok(result),
"waiting" | "running" => {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
"error" | "cancelled" | "cancelling" => {
return Ok(result);
}
_ => {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
}
anyhow::bail!("Statement did not complete within timeout")
}
async fn detect_kernel(http: &Client, ws_id: &str, item_id: &str) -> Result<String> {
let def = client::get_definition(http, ws_id, item_id, "ipynb").await?;
let parts = def.definition.context("No definition")?.parts;
let part = parts
.iter()
.find(|p| p.path.contains("notebook-content") || p.path.ends_with(".ipynb"))
.or_else(|| parts.first())
.context("No parts")?;
let decoded = BASE64.decode(&part.payload)?;
let ipynb: serde_json::Value = serde_json::from_slice(&decoded)?;
let kernel = ipynb
.pointer("/metadata/kernel_info/name")
.or_else(|| ipynb.pointer("/metadata/kernelspec/name"))
.and_then(|v| v.as_str())
.unwrap_or("jupyter")
.to_string();
Ok(kernel)
}
async fn detect_lakehouse(http: &Client, ws_id: &str, item_id: &str) -> Result<String> {
let def = client::get_definition(http, ws_id, item_id, "ipynb").await?;
let parts = def.definition.context("No definition")?.parts;
let part = parts
.iter()
.find(|p| p.path.contains("notebook-content") || p.path.ends_with(".ipynb"))
.or_else(|| parts.first())
.context("No parts")?;
let decoded = BASE64.decode(&part.payload)?;
let ipynb: serde_json::Value = serde_json::from_slice(&decoded)?;
ipynb
.pointer("/metadata/dependencies/lakehouse/default_lakehouse")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.context("No default_lakehouse in notebook metadata")
}