ordinary-api 0.6.0-pre.13

API server for Ordinary
Documentation
// Copyright (C) 2026 Ordinary Labs, LLC.
//
// SPDX-License-Identifier: AGPL-3.0-only

use crate::client::OrdinaryApiClient;
use anyhow::bail;
use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD as b64};
use bytes::Bytes;
use ordinary_config::OrdinaryConfig;
use ordinary_monitor::LogFileMetadata;
use ordinary_monitor::client::{IndexDbs, OrdinaryMonitorClient};
use tracing::Instrument;
use uuid::Uuid;

pub fn logs_local_metadata(proj_path: &str, is_app: bool) -> anyhow::Result<Vec<LogFileMetadata>> {
    let config = is_app.then(|| OrdinaryConfig::get(proj_path).expect("failed to get config"));

    let monitor_client = if let Some(config) = &config {
        OrdinaryMonitorClient::new(&config.domain, vec![IndexDbs::Tantivy])?
    } else {
        OrdinaryMonitorClient::new("system", vec![IndexDbs::Tantivy])?
    };

    monitor_client.metadata()
}

async fn fetch_log_file(
    api_client: &OrdinaryApiClient<'_>,
    correlation_id: &str,
    params: &mut Vec<(&str, String)>,
    name: &str,
    is_app: bool,
) -> anyhow::Result<Bytes> {
    tracing::info!("fetching");

    let access_token = api_client
        .get_access(None, Some(correlation_id.to_string()))
        .await?;

    let file_req = api_client
        .client
        .get(format!(
            "{}/v1{}/logs/files/{name}",
            api_client.addr,
            if is_app { "/app" } else { "" }
        ))
        .query(&params)
        .header(
            "Authorization",
            format!("Bearer {}", b64.encode(&access_token)),
        )
        .header("x-correlation-id", correlation_id);

    let file_res = file_req.send().await?;
    let status = file_res.status();

    if status.is_success() {
        let file_bytes = file_res.bytes().await?;
        tracing::info!("fetched");
        Ok(file_bytes)
    } else {
        let message = file_res.text().await?;
        tracing::error!(%status, %message, "failed request");
        bail!("bad request");
    }
}

pub async fn logs_remote_metadata(
    api_client: &OrdinaryApiClient<'_>,
    proj_path: &str,
    correlation_id: Option<String>,
    is_app: bool,
) -> anyhow::Result<Vec<LogFileMetadata>> {
    tracing::info!("fetching logs metadata...");

    let correlation_id = correlation_id.unwrap_or(
        api_client
            .correlation_id
            .unwrap_or(Uuid::new_v4())
            .to_string(),
    );

    tracing::info!("before");

    let config = is_app.then(|| OrdinaryConfig::get(proj_path).expect("failed to get config"));

    tracing::info!("after");

    let span = tracing::info_span!("metadata");

    async {
        let params = vec![(
            "d",
            config.map_or("system".to_string(), |c| c.domain.clone()),
        )];

        let access_token = api_client
            .get_access(None, Some(correlation_id.clone()))
            .await?;

        let metadata_req = api_client
            .client
            .get(format!(
                "{}/v1{}/logs/metadata",
                api_client.addr,
                if is_app { "/app" } else { "" },
            ))
            .query(&params)
            .header(
                "Authorization",
                format!("Bearer {}", b64.encode(&access_token)),
            )
            .header("x-correlation-id", correlation_id.clone());

        let metadata_res = metadata_req.send().await?;
        tracing::info!("fetched");

        let mut log_files_metadata: Vec<LogFileMetadata> = metadata_res.json().await?;
        log_files_metadata.sort_by(|a, b| a.name.cmp(&b.name));
        anyhow::Ok(log_files_metadata)
    }
    .instrument(span)
    .await
}

#[allow(clippy::missing_panics_doc, clippy::ref_option)]
pub async fn logs_sync(
    api_client: &OrdinaryApiClient<'_>,
    proj_path: &str,
    force: Option<bool>,
    file: Option<&str>,
    is_app: bool,
) -> anyhow::Result<()> {
    tracing::info!("syncing logs...");

    let correlation_id = api_client
        .correlation_id
        .unwrap_or(Uuid::new_v4())
        .to_string();

    let config = is_app.then(|| OrdinaryConfig::get(proj_path).expect("failed to get config"));
    let monitor_client = if let Some(config) = &config {
        OrdinaryMonitorClient::new(&config.domain, vec![IndexDbs::Tantivy])?
    } else {
        OrdinaryMonitorClient::new("system", vec![IndexDbs::Tantivy])?
    };

    if let Some(name) = file {
        let mut params = vec![(
            "d",
            config
                .as_ref()
                .map_or("system".to_string(), |c| c.domain.clone()),
        )];
        let span = tracing::info_span!("file", %name, cursor = tracing::field::Empty);

        async {
            let span = tracing::info_span!("sync");

            let file_bytes = async {
                fetch_log_file(api_client, &correlation_id, &mut params, name, is_app).await
            }
            .instrument(span)
            .await?;

            monitor_client.store(name, file_bytes.as_ref(), true)?;
            monitor_client.tantivy_index(name, &None, true)?;

            anyhow::Ok(())
        }
        .instrument(span)
        .await?;

        return Ok(());
    }

    let remote_files_metadata =
        logs_remote_metadata(api_client, proj_path, Some(correlation_id.clone()), is_app).await?;

    for remote_file_metadata in &remote_files_metadata {
        let mut params = vec![(
            "d",
            config
                .as_ref()
                .map_or("system".to_string(), |c| c.domain.clone()),
        )];

        let name = remote_file_metadata.name.clone();
        let (cursor, force) = if force == Some(true) {
            (Some(0), true)
        } else {
            (monitor_client.should_refetch(remote_file_metadata)?, false)
        };

        let span = tracing::info_span!("file", %name, cursor = tracing::field::Empty);

        if let Some(c) = cursor {
            if c > 0 {
                params.push(("c", c.to_string()));
                span.record("cursor", c);
            }
        } else {
            tracing::info!("skipping");
            continue;
        }

        async {
            let span = tracing::info_span!("sync");

            let file_bytes = async {
                fetch_log_file(api_client, &correlation_id, &mut params, &name, is_app).await
            }
            .instrument(span)
            .await?;

            if force {
                monitor_client.store(&name, file_bytes.as_ref(), true)?;
                monitor_client.tantivy_index(&name, &None, true)?;
            } else {
                monitor_client.store(&name, file_bytes.as_ref(), false)?;
                monitor_client.tantivy_index(&name, &cursor, false)?;
            }

            anyhow::Ok(())
        }
        .instrument(span)
        .await?;
    }

    Ok(())
}