use anyhow::{Context, Result};
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use reqwest::Client;
use std::io::Read as _;
use std::io::Write as _;
use crate::auth;
use crate::client;
use super::cells::cell_source_string;
const LIVY_API_VERSION: &str = "2023-12-01";
const SPINNER: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"];
const CYAN: &str = "\x1b[36m";
const DIM: &str = "\x1b[2m";
const GREEN: &str = "\x1b[32m";
const RED: &str = "\x1b[31m";
const RESET: &str = "\x1b[0m";
fn spin(frame: usize, msg: &str) {
let idx = frame % SPINNER.len();
eprint!("\r {CYAN}{}{RESET} {DIM}{msg}{RESET} ", SPINNER[idx]);
let _ = std::io::stderr().flush();
}
fn spin_done(msg: &str, ok: bool) {
let icon = if ok { format!("{GREEN}ok{RESET}") } else { format!("{RED}!!{RESET}") };
eprintln!("\r {icon} {DIM}{msg}{RESET} ");
}
pub async fn run_exec_quick(
http: &Client,
reference: &str,
code: &str,
) -> Result<()> {
let (ws_name, lh_name) = client::parse_ref(reference)
.context("Expected format: \"Workspace/Lakehouse\" or \"Workspace.Workspace/LH.Lakehouse\"\n nb exec code \"MyWS/MyLH.Lakehouse\" \"print('hello')\"")?;
let lh_clean = lh_name.trim_end_matches(".Lakehouse");
let exec_code = if code == "-" {
let mut buf = String::new();
std::io::stdin().read_to_string(&mut buf)
.context("Failed to read code from stdin")?;
buf
} else {
code.to_string()
};
if exec_code.trim().is_empty() {
anyhow::bail!("No code to execute.\n nb exec code \"Workspace/Lakehouse\" \"print('hello')\"");
}
let ws_id = client::resolve_workspace(http, ws_name).await?;
let lh = client::resolve_item(http, &ws_id, lh_clean, "Lakehouse").await?;
let session_name = format!("nb-cli-{}", lh_clean);
run_in_session(http, &ws_id, &lh.id, "pyspark", &session_name, &exec_code).await
}
pub async fn run_exec(
http: &Client,
reference: &str,
code: Option<&str>,
cell_index: Option<usize>,
lakehouse: Option<&str>,
) -> Result<()> {
let (ws_name, nb_name) = client::parse_ref(reference)?;
let ws_id = client::resolve_workspace(http, ws_name).await?;
let nb = client::resolve_item(http, &ws_id, nb_name, "Notebook").await?;
let exec_code = match (code, cell_index) {
(Some(c), _) => c.to_string(),
(None, Some(idx)) => {
let def = client::get_definition(http, &ws_id, &nb.id, "ipynb").await?;
let parts = def.definition.context("No definition returned")?.parts;
let part = parts
.iter()
.find(|p| p.path.contains("notebook-content") || p.path.ends_with(".ipynb"))
.or_else(|| parts.first())
.context("No parts in definition")?;
let decoded = BASE64
.decode(&part.payload)
.context("Failed to decode base64 payload")?;
let ipynb: serde_json::Value =
serde_json::from_slice(&decoded).context("Failed to parse ipynb JSON")?;
let cells = ipynb
.get("cells")
.and_then(|c| c.as_array())
.context("No cells array")?;
let cell = cells
.get(idx)
.context(format!("Cell index {} out of range", idx))?;
cell_source_string(cell)
}
(None, None) => anyhow::bail!(
"Provide --code or a cell index.\n nb exec cell \"Workspace/Notebook\" 0"
),
};
let lh_id = match lakehouse {
Some(lh_name) => {
let lh = client::resolve_item(http, &ws_id, lh_name, "Lakehouse").await?;
lh.id
}
None => {
detect_lakehouse(http, &ws_id, &nb.id).await
.context("No lakehouse attached to notebook. Use --lakehouse <name> to specify one.")?
}
};
let kernel = detect_kernel(http, &ws_id, &nb.id).await.unwrap_or_else(|_| "jupyter".to_string());
let kind = if kernel == "synapse_pyspark" { "pyspark" } else { "python" };
let session_name = format!("nb-cli-{}", nb.display_name);
run_in_session(http, &ws_id, &lh_id, kind, &session_name, &exec_code).await
}
async fn run_in_session(
http: &Client,
ws_id: &str,
lh_id: &str,
kind: &str,
session_name: &str,
code: &str,
) -> Result<()> {
let start = std::time::Instant::now();
eprintln!(" {DIM}Executing...{RESET}");
spin(0, "Creating session...");
let token = auth::get_fabric_token()?;
let session_url = format!(
"https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions",
ws_id, lh_id, LIVY_API_VERSION
);
let session_body = serde_json::json!({
"kind": kind,
"name": session_name
});
let resp = http
.post(&session_url)
.bearer_auth(&token)
.json(&session_body)
.send()
.await
.context("Failed to create session")?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
spin_done("Failed to create session", false);
anyhow::bail!("Failed to create session ({}): {}", status, body);
}
let session: serde_json::Value = resp.json().await?;
let session_id = session
.get("id")
.map(|v| match v {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Number(n) => n.to_string(),
_ => String::new(),
})
.filter(|s| !s.is_empty())
.context("No session ID returned")?;
let short_id = &session_id[..8];
spin_done(&format!("Session {short_id} created"), true);
let result = tokio::select! {
res = execute_and_print(http, ws_id, lh_id, &session_id, kind, code, &start) => res,
_ = tokio::signal::ctrl_c() => {
eprintln!("\n {RED}Interrupted{RESET}");
Err(anyhow::anyhow!("Interrupted"))
}
};
let _ = cleanup_session(http, ws_id, lh_id, &session_id).await;
let elapsed = start.elapsed();
let ok = result.is_ok();
let status_str = if ok { format!("{GREEN}ok{RESET}") } else { format!("{RED}error{RESET}") };
eprintln!(" {DIM}---{RESET}");
eprintln!(" {DIM}session{RESET} {short_id} {DIM}status{RESET} {status_str} {DIM}duration{RESET} {:.1}s", elapsed.as_secs_f64());
result
}
async fn execute_and_print(
http: &Client,
ws_id: &str,
lh_id: &str,
session_id: &str,
kind: &str,
code: &str,
start: &std::time::Instant,
) -> Result<()> {
wait_for_session_idle(http, ws_id, lh_id, session_id, start).await?;
spin(0, "Submitting code...");
let stmt_url = format!(
"https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions/{}/statements",
ws_id, lh_id, LIVY_API_VERSION, session_id
);
let stmt_body = serde_json::json!({
"code": code,
"kind": kind
});
let token = auth::get_fabric_token()?;
let stmt_resp = http
.post(&stmt_url)
.bearer_auth(&token)
.json(&stmt_body)
.send()
.await
.context("Failed to submit statement")?;
let stmt: serde_json::Value = stmt_resp.json().await?;
let stmt_id = stmt
.get("id")
.and_then(|v| v.as_u64())
.context("No statement ID returned")?;
let result = poll_statement(http, ws_id, lh_id, session_id, stmt_id, start).await?;
let output_status = result
.pointer("/output/status")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
if output_status == "ok" {
spin_done("Done", true);
if let Some(text) = result.pointer("/output/data/text~1plain").and_then(|v| v.as_str()) {
println!("{}", text);
}
Ok(())
} else {
let ename = result.pointer("/output/ename").and_then(|v| v.as_str()).unwrap_or("");
let evalue = result.pointer("/output/evalue").and_then(|v| v.as_str()).unwrap_or("");
spin_done(&format!("{ename}: {evalue}"), false);
if let Some(traceback) = result.pointer("/output/traceback").and_then(|v| v.as_array()) {
for line in traceback {
if let Some(s) = line.as_str() {
eprintln!(" {DIM}{s}{RESET}");
}
}
}
anyhow::bail!("Code execution failed: {} {}", ename, evalue)
}
}
async fn cleanup_session(
http: &Client,
ws_id: &str,
lh_id: &str,
session_id: &str,
) -> Result<()> {
let token = auth::get_fabric_token()?;
let url = format!(
"https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions/{}",
ws_id, lh_id, LIVY_API_VERSION, session_id
);
let _ = http.delete(&url).bearer_auth(&token).send().await;
Ok(())
}
async fn wait_for_session_idle(
http: &Client,
ws_id: &str,
lh_id: &str,
session_id: &str,
start: &std::time::Instant,
) -> Result<()> {
let check_url = format!(
"https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions/{}",
ws_id, lh_id, LIVY_API_VERSION, session_id
);
for i in 0..120 {
let token = auth::get_fabric_token()?;
let resp = http
.get(&check_url)
.bearer_auth(&token)
.send()
.await?;
let session: serde_json::Value = resp.json().await?;
let state = session.get("state").and_then(|v| v.as_str()).unwrap_or("unknown");
match state {
"idle" => {
spin_done(&format!("Session ready ({:.0}s)", start.elapsed().as_secs_f64()), true);
return Ok(());
}
"starting" | "not_started" => {
spin(i, &format!("Starting session ({:.0}s)...", start.elapsed().as_secs_f64()));
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
"dead" | "error" | "killed" => {
spin_done(&format!("Session {state}"), false);
anyhow::bail!("Session entered {} state", state);
}
_ => {
spin(i, &format!("{state} ({:.0}s)...", start.elapsed().as_secs_f64()));
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
spin_done("Session timed out", false);
anyhow::bail!("Session did not become idle within 10 minutes")
}
async fn poll_statement(
http: &Client,
ws_id: &str,
lh_id: &str,
session_id: &str,
stmt_id: u64,
start: &std::time::Instant,
) -> Result<serde_json::Value> {
let stmt_url = format!(
"https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions/{}/statements/{}",
ws_id, lh_id, LIVY_API_VERSION, session_id, stmt_id
);
for i in 0..120 {
let token = auth::get_fabric_token()?;
let resp = http
.get(&stmt_url)
.bearer_auth(&token)
.send()
.await?;
let result: serde_json::Value = resp.json().await?;
let state = result.get("state").and_then(|v| v.as_str()).unwrap_or("unknown");
match state {
"available" => return Ok(result),
"waiting" | "running" => {
spin(i, &format!("Running ({:.0}s)...", start.elapsed().as_secs_f64()));
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
"error" | "cancelled" | "cancelling" => {
return Ok(result);
}
_ => {
spin(i, &format!("{state} ({:.0}s)...", start.elapsed().as_secs_f64()));
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
}
spin_done("Statement timed out", false);
anyhow::bail!("Statement did not complete within timeout")
}
async fn detect_kernel(http: &Client, ws_id: &str, item_id: &str) -> Result<String> {
let def = client::get_definition(http, ws_id, item_id, "ipynb").await?;
let parts = def.definition.context("No definition")?.parts;
let part = parts
.iter()
.find(|p| p.path.contains("notebook-content") || p.path.ends_with(".ipynb"))
.or_else(|| parts.first())
.context("No parts")?;
let decoded = BASE64.decode(&part.payload)?;
let ipynb: serde_json::Value = serde_json::from_slice(&decoded)?;
let kernel = ipynb
.pointer("/metadata/kernel_info/name")
.or_else(|| ipynb.pointer("/metadata/kernelspec/name"))
.and_then(|v| v.as_str())
.unwrap_or("jupyter")
.to_string();
Ok(kernel)
}
async fn detect_lakehouse(http: &Client, ws_id: &str, item_id: &str) -> Result<String> {
let def = client::get_definition(http, ws_id, item_id, "ipynb").await?;
let parts = def.definition.context("No definition")?.parts;
let part = parts
.iter()
.find(|p| p.path.contains("notebook-content") || p.path.ends_with(".ipynb"))
.or_else(|| parts.first())
.context("No parts")?;
let decoded = BASE64.decode(&part.payload)?;
let ipynb: serde_json::Value = serde_json::from_slice(&decoded)?;
ipynb
.pointer("/metadata/dependencies/lakehouse/default_lakehouse")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.context("No default_lakehouse in notebook metadata")
}