nb-fabric 0.3.0

CLI for managing Microsoft Fabric notebooks; create, edit cells, execute interactively, schedule, and query OneLake data
// #region Imports
use anyhow::{Context, Result};
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use reqwest::Client;

use crate::auth;
use crate::client;
use super::cells::cell_source_string;
// #endregion

// #region Variables
const LIVY_API_VERSION: &str = "2023-12-01";
// #endregion

// #region Functions

/// Handle `nb exec <workspace/notebook> --code "<code>"` command.
/// Executes code in a Livy session against the notebook's attached lakehouse.
/// Returns the cell output.
pub async fn run_exec(
    http: &Client,
    reference: &str,
    code: Option<&str>,
    cell_index: Option<usize>,
    lakehouse: Option<&str>,
) -> Result<()> {
    let (ws_name, nb_name) = client::parse_ref(reference)?;
    let ws_id = client::resolve_workspace(http, ws_name).await?;
    let nb = client::resolve_item(http, &ws_id, nb_name, "Notebook").await?;

    // Resolve the code to execute
    let exec_code = match (code, cell_index) {
        (Some(c), _) => c.to_string(),
        (None, Some(idx)) => {
            let def = client::get_definition(http, &ws_id, &nb.id, "ipynb").await?;
            let parts = def.definition.context("No definition returned")?.parts;
            let part = parts
                .iter()
                .find(|p| p.path.contains("notebook-content") || p.path.ends_with(".ipynb"))
                .or_else(|| parts.first())
                .context("No parts in definition")?;

            let decoded = BASE64
                .decode(&part.payload)
                .context("Failed to decode base64 payload")?;
            let ipynb: serde_json::Value =
                serde_json::from_slice(&decoded).context("Failed to parse ipynb JSON")?;

            let cells = ipynb
                .get("cells")
                .and_then(|c| c.as_array())
                .context("No cells array")?;

            let cell = cells
                .get(idx)
                .context(format!("Cell index {} out of range", idx))?;

            cell_source_string(cell)
        }
        (None, None) => anyhow::bail!("Provide --code or a cell index"),
    };

    // Resolve lakehouse ID (from --lakehouse flag, or detect from notebook metadata)
    let lh_id = match lakehouse {
        Some(lh_name) => {
            let lh = client::resolve_item(http, &ws_id, lh_name, "Lakehouse").await?;
            lh.id
        }
        None => {
            detect_lakehouse(http, &ws_id, &nb.id).await
                .context("No lakehouse attached to notebook. Use --lakehouse <name> to specify one.")?
        }
    };

    // Detect kernel type to use correct Livy kind
    let kernel = detect_kernel(http, &ws_id, &nb.id).await.unwrap_or_else(|_| "jupyter".to_string());
    let livy_kind = if kernel == "synapse_pyspark" { "pyspark" } else { "python" };

    eprintln!("  Creating Livy session (kind={})...", livy_kind);

    // Create Livy session
    let token = auth::get_fabric_token()?;
    let session_url = format!(
        "https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions",
        ws_id, lh_id, LIVY_API_VERSION
    );

    let session_body = serde_json::json!({
        "kind": livy_kind,
        "name": format!("nb-cli-{}", nb.display_name)
    });

    let resp = http
        .post(&session_url)
        .bearer_auth(&token)
        .json(&session_body)
        .send()
        .await
        .context("Failed to create Livy session")?;

    let status = resp.status();
    if !status.is_success() && status.as_u16() != 201 {
        let body = resp.text().await.unwrap_or_default();
        anyhow::bail!("Failed to create Livy session ({}): {}", status, body);
    }

    let session: serde_json::Value = resp.json().await?;
    let session_id = session
        .get("id")
        .map(|v| match v {
            serde_json::Value::String(s) => s.clone(),
            serde_json::Value::Number(n) => n.to_string(),
            _ => String::new(),
        })
        .filter(|s| !s.is_empty())
        .context("No session ID returned")?;

    eprintln!("  Session {} created; waiting for idle state...", session_id);

    // Wait for session to be ready
    wait_for_session_idle(http, &ws_id, &lh_id, &session_id).await?;

    eprintln!("  Submitting code...");

    // Submit statement
    let stmt_url = format!(
        "https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions/{}/statements",
        ws_id, lh_id, LIVY_API_VERSION, session_id
    );

    let stmt_body = serde_json::json!({
        "code": exec_code,
        "kind": livy_kind
    });

    let token = auth::get_fabric_token()?;
    let stmt_resp = http
        .post(&stmt_url)
        .bearer_auth(&token)
        .json(&stmt_body)
        .send()
        .await
        .context("Failed to submit statement")?;

    let stmt: serde_json::Value = stmt_resp.json().await?;
    let stmt_id = stmt
        .get("id")
        .and_then(|v| v.as_u64())
        .context("No statement ID returned")?;

    // Poll for statement result
    let result = poll_statement(http, &ws_id, &lh_id, &session_id, stmt_id).await?;

    // Print output
    let output_status = result
        .pointer("/output/status")
        .and_then(|v| v.as_str())
        .unwrap_or("unknown");

    if output_status == "ok" {
        if let Some(text) = result.pointer("/output/data/text~1plain").and_then(|v| v.as_str()) {
            println!("{}", text);
        }
    } else {
        // Error output
        let ename = result.pointer("/output/ename").and_then(|v| v.as_str()).unwrap_or("");
        let evalue = result.pointer("/output/evalue").and_then(|v| v.as_str()).unwrap_or("");
        eprintln!("  Error: {} {}", ename, evalue);
        if let Some(traceback) = result.pointer("/output/traceback").and_then(|v| v.as_array()) {
            for line in traceback {
                if let Some(s) = line.as_str() {
                    eprintln!("  {}", s);
                }
            }
        }
    }

    // Clean up the Livy session
    let _ = cleanup_session(http, &ws_id, &lh_id, &session_id).await;

    Ok(())
}


/// Clean up a Livy session by sending a DELETE request.
/// Errors are silently ignored; this is best-effort cleanup.
async fn cleanup_session(
    http: &Client,
    ws_id: &str,
    lh_id: &str,
    session_id: &str,
) -> Result<()> {
    let token = auth::get_fabric_token()?;
    let url = format!(
        "https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions/{}",
        ws_id, lh_id, LIVY_API_VERSION, session_id
    );
    let _ = http.delete(&url).bearer_auth(&token).send().await;
    eprintln!("  Session cleaned up");
    Ok(())
}


/// Wait for a Livy session to reach 'idle' state.
async fn wait_for_session_idle(
    http: &Client,
    ws_id: &str,
    lh_id: &str,
    session_id: &str,
) -> Result<()> {
    let check_url = format!(
        "https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions/{}",
        ws_id, lh_id, LIVY_API_VERSION, session_id
    );

    for i in 0..120 {
        let token = auth::get_fabric_token()?;
        let resp = http
            .get(&check_url)
            .bearer_auth(&token)
            .send()
            .await?;

        let session: serde_json::Value = resp.json().await?;
        let state = session.get("state").and_then(|v| v.as_str()).unwrap_or("unknown");

        match state {
            "idle" => return Ok(()),
            "starting" | "not_started" => {
                if i % 6 == 0 && i > 0 {
                    eprintln!("  Still starting... ({}s)", i * 5);
                }
                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
            }
            "dead" | "error" | "killed" => {
                anyhow::bail!("Session entered {} state", state);
            }
            _ => {
                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
            }
        }
    }

    anyhow::bail!("Session did not become idle within 10 minutes")
}


/// Poll a Livy statement until it's available.
async fn poll_statement(
    http: &Client,
    ws_id: &str,
    lh_id: &str,
    session_id: &str,
    stmt_id: u64,
) -> Result<serde_json::Value> {
    let stmt_url = format!(
        "https://api.fabric.microsoft.com/v1/workspaces/{}/lakehouses/{}/livyApi/versions/{}/sessions/{}/statements/{}",
        ws_id, lh_id, LIVY_API_VERSION, session_id, stmt_id
    );

    for _ in 0..120 {
        let token = auth::get_fabric_token()?;
        let resp = http
            .get(&stmt_url)
            .bearer_auth(&token)
            .send()
            .await?;

        let result: serde_json::Value = resp.json().await?;
        let state = result.get("state").and_then(|v| v.as_str()).unwrap_or("unknown");

        match state {
            "available" => return Ok(result),
            "waiting" | "running" => {
                tokio::time::sleep(std::time::Duration::from_secs(2)).await;
            }
            "error" | "cancelled" | "cancelling" => {
                return Ok(result);
            }
            _ => {
                tokio::time::sleep(std::time::Duration::from_secs(2)).await;
            }
        }
    }

    anyhow::bail!("Statement did not complete within timeout")
}


/// Detect kernel name from notebook metadata.
async fn detect_kernel(http: &Client, ws_id: &str, item_id: &str) -> Result<String> {
    let def = client::get_definition(http, ws_id, item_id, "ipynb").await?;
    let parts = def.definition.context("No definition")?.parts;
    let part = parts
        .iter()
        .find(|p| p.path.contains("notebook-content") || p.path.ends_with(".ipynb"))
        .or_else(|| parts.first())
        .context("No parts")?;

    let decoded = BASE64.decode(&part.payload)?;
    let ipynb: serde_json::Value = serde_json::from_slice(&decoded)?;

    let kernel = ipynb
        .pointer("/metadata/kernel_info/name")
        .or_else(|| ipynb.pointer("/metadata/kernelspec/name"))
        .and_then(|v| v.as_str())
        .unwrap_or("jupyter")
        .to_string();

    Ok(kernel)
}


/// Detect the default lakehouse ID from notebook metadata.
async fn detect_lakehouse(http: &Client, ws_id: &str, item_id: &str) -> Result<String> {
    let def = client::get_definition(http, ws_id, item_id, "ipynb").await?;
    let parts = def.definition.context("No definition")?.parts;
    let part = parts
        .iter()
        .find(|p| p.path.contains("notebook-content") || p.path.ends_with(".ipynb"))
        .or_else(|| parts.first())
        .context("No parts")?;

    let decoded = BASE64.decode(&part.payload)?;
    let ipynb: serde_json::Value = serde_json::from_slice(&decoded)?;

    ipynb
        .pointer("/metadata/dependencies/lakehouse/default_lakehouse")
        .and_then(|v| v.as_str())
        .filter(|s| !s.is_empty())
        .map(|s| s.to_string())
        .context("No default_lakehouse in notebook metadata")
}


// #endregion