use crate::client::OrdinaryApiClient;
use anyhow::bail;
use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD as b64};
use bytes::Bytes;
use ordinary_config::OrdinaryConfig;
use ordinary_monitor::LogFileMetadata;
use ordinary_monitor::client::{IndexDbs, OrdinaryMonitorClient};
use tracing::Instrument;
use uuid::Uuid;
pub fn logs_local_metadata(proj_path: &str, is_app: bool) -> anyhow::Result<Vec<LogFileMetadata>> {
let config = is_app.then(|| OrdinaryConfig::get(proj_path).expect("failed to get config"));
let monitor_client = if let Some(config) = &config {
OrdinaryMonitorClient::new(&config.domain, vec![IndexDbs::Tantivy])?
} else {
OrdinaryMonitorClient::new("system", vec![IndexDbs::Tantivy])?
};
monitor_client.metadata()
}
async fn fetch_log_file(
api_client: &OrdinaryApiClient<'_>,
correlation_id: &str,
params: &mut Vec<(&str, String)>,
name: &str,
is_app: bool,
) -> anyhow::Result<Bytes> {
tracing::info!("fetching");
let access_token = api_client
.get_access(None, Some(correlation_id.to_string()))
.await?;
let file_req = api_client
.client
.get(format!(
"{}/v1{}/logs/files/{name}",
api_client.addr,
if is_app { "/app" } else { "" }
))
.query(¶ms)
.header(
"Authorization",
format!("Bearer {}", b64.encode(&access_token)),
)
.header("x-correlation-id", correlation_id);
let file_res = file_req.send().await?;
let status = file_res.status();
if status.is_success() {
let file_bytes = file_res.bytes().await?;
tracing::info!("fetched");
Ok(file_bytes)
} else {
let message = file_res.text().await?;
tracing::error!(%status, %message, "failed request");
bail!("bad request");
}
}
pub async fn logs_remote_metadata(
api_client: &OrdinaryApiClient<'_>,
proj_path: &str,
correlation_id: Option<String>,
is_app: bool,
) -> anyhow::Result<Vec<LogFileMetadata>> {
tracing::info!("fetching logs metadata...");
let correlation_id = correlation_id.unwrap_or(
api_client
.correlation_id
.unwrap_or(Uuid::new_v4())
.to_string(),
);
tracing::info!("before");
let config = is_app.then(|| OrdinaryConfig::get(proj_path).expect("failed to get config"));
tracing::info!("after");
let span = tracing::info_span!("metadata");
async {
let params = vec![(
"d",
config.map_or("system".to_string(), |c| c.domain.clone()),
)];
let access_token = api_client
.get_access(None, Some(correlation_id.clone()))
.await?;
let metadata_req = api_client
.client
.get(format!(
"{}/v1{}/logs/metadata",
api_client.addr,
if is_app { "/app" } else { "" },
))
.query(¶ms)
.header(
"Authorization",
format!("Bearer {}", b64.encode(&access_token)),
)
.header("x-correlation-id", correlation_id.clone());
let metadata_res = metadata_req.send().await?;
tracing::info!("fetched");
let mut log_files_metadata: Vec<LogFileMetadata> = metadata_res.json().await?;
log_files_metadata.sort_by(|a, b| a.name.cmp(&b.name));
anyhow::Ok(log_files_metadata)
}
.instrument(span)
.await
}
#[allow(clippy::missing_panics_doc, clippy::ref_option)]
pub async fn logs_sync(
api_client: &OrdinaryApiClient<'_>,
proj_path: &str,
force: Option<bool>,
file: Option<&str>,
is_app: bool,
) -> anyhow::Result<()> {
tracing::info!("syncing logs...");
let correlation_id = api_client
.correlation_id
.unwrap_or(Uuid::new_v4())
.to_string();
let config = is_app.then(|| OrdinaryConfig::get(proj_path).expect("failed to get config"));
let monitor_client = if let Some(config) = &config {
OrdinaryMonitorClient::new(&config.domain, vec![IndexDbs::Tantivy])?
} else {
OrdinaryMonitorClient::new("system", vec![IndexDbs::Tantivy])?
};
if let Some(name) = file {
let mut params = vec![(
"d",
config
.as_ref()
.map_or("system".to_string(), |c| c.domain.clone()),
)];
let span = tracing::info_span!("file", %name, cursor = tracing::field::Empty);
async {
let span = tracing::info_span!("sync");
let file_bytes = async {
fetch_log_file(api_client, &correlation_id, &mut params, name, is_app).await
}
.instrument(span)
.await?;
monitor_client.store(name, file_bytes.as_ref(), true)?;
monitor_client.tantivy_index(name, &None, true)?;
anyhow::Ok(())
}
.instrument(span)
.await?;
return Ok(());
}
let remote_files_metadata =
logs_remote_metadata(api_client, proj_path, Some(correlation_id.clone()), is_app).await?;
for remote_file_metadata in &remote_files_metadata {
let mut params = vec![(
"d",
config
.as_ref()
.map_or("system".to_string(), |c| c.domain.clone()),
)];
let name = remote_file_metadata.name.clone();
let (cursor, force) = if force == Some(true) {
(Some(0), true)
} else {
(monitor_client.should_refetch(remote_file_metadata)?, false)
};
let span = tracing::info_span!("file", %name, cursor = tracing::field::Empty);
if let Some(c) = cursor {
if c > 0 {
params.push(("c", c.to_string()));
span.record("cursor", c);
}
} else {
tracing::info!("skipping");
continue;
}
async {
let span = tracing::info_span!("sync");
let file_bytes = async {
fetch_log_file(api_client, &correlation_id, &mut params, &name, is_app).await
}
.instrument(span)
.await?;
if force {
monitor_client.store(&name, file_bytes.as_ref(), true)?;
monitor_client.tantivy_index(&name, &None, true)?;
} else {
monitor_client.store(&name, file_bytes.as_ref(), false)?;
monitor_client.tantivy_index(&name, &cursor, false)?;
}
anyhow::Ok(())
}
.instrument(span)
.await?;
}
Ok(())
}