use std::{
collections::{BTreeMap, HashMap},
sync::LazyLock,
};
use actix_web::{
HttpRequest, Responder, Result,
rt::spawn,
web::{Bytes, Data, Payload, Query},
};
use actix_ws::{AggregatedMessage, Session, handle};
use bincode::{config::standard, decode_from_slice, encode_to_vec};
use futures_util::StreamExt as _;
use libbarto::{
BartoCli, BartosToBartoCli, CliUpdateKind, ClientData, Garuda, ListOutput,
OffsetDataTimeWrapper, OutputTableName, Pacman, UpdateKind, UuidWrapper,
};
use regex::Regex;
use sqlx::{Column, MySqlPool, Row};
use time::{
OffsetDateTime,
macros::{offset, time},
};
use tokio::{select, sync::Mutex};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, trace};
use uuid::Uuid;
use vergen_pretty::{Pretty, PrettyExt, vergen_pretty_env};
use crate::{common::Clients, config::Config, endpoints::insecure::Name};
static PACMAN_PACKAGES_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"Packages \((\d+)\) (.*)").expect("failed to create pacman packages update regex")
});
static PACMAN_DOWNLOAD_SIZE_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"Total Download Size:[ ]+(\d+\.\d+) MiB")
.expect("failed to create pacman download size regex")
});
static PACMAN_INSTALL_SIZE_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"Total Installed Size:[ ]+(\d+\.\d+) MiB")
.expect("failed to create pacman install size regex")
});
static NET_UPGRADE_SIZE_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"Net Upgrade Size:[ ]+(\d+\.\d+) MiB")
.expect("failed to create net upgrade size regex")
});
static CACHYOS_UPDATE_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r"(cachyos-.*|core|extra|multilib)\/([^ ]+)\s+([^ ]+)\s+([^ ]+)\s+(.+ MiB)\s+(.+ MiB)",
)
.expect("failed to create cachyos-update regex")
});
static GARUDA_UPDATE_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r"(chaotic-aur|core|extra|multilib)\/([^ ]+)\s+([^ ]+)\s+([^ ]+)\s+(.+ MiB)\s+(.+ MiB)",
)
.expect("failed to create garuda-update regex")
});
pub(crate) async fn cli(
request: HttpRequest,
body: Payload,
name: Query<Name>,
token: Data<CancellationToken>,
config: Data<Config>,
pool: Data<MySqlPool>,
clients_mutex: Data<Mutex<Clients>>,
) -> Result<impl Responder> {
let describe = name.describe(&request);
info!("cli connection from '{describe}'");
let ws_token = token.get_ref().clone();
let (response, session, msg_stream) = handle(&request, body)?;
let mut ws_session = session.clone();
let mut agms = msg_stream.aggregate_continuations();
let _handle = spawn(async move {
loop {
select! {
() = ws_token.cancelled() => {
trace!("cancellation token triggered, closing websocket");
let _ = ws_session.close(None).await;
break;
}
res_opt = agms.next() => {
if let Some(Ok(msg)) = res_opt {
match msg {
AggregatedMessage::Text(_byte_string) => error!("unexpected text message"),
AggregatedMessage::Binary(bytes) => if let Err(e) = handle_binary(
bytes,
&mut ws_session,
config.as_ref(),
pool.as_ref(),
clients_mutex.clone(),
).await {
error!("{e}");
},
AggregatedMessage::Ping(_bytes) => error!("unexpected ping message"),
AggregatedMessage::Pong(_bytes) => error!("unexpected pong message"),
AggregatedMessage::Close(close_reason) => {
trace!("handling close message");
if let Some(cr) = &close_reason {
let code = u16::from(cr.code);
if let Some(desc) = &cr.description {
trace!("close reason: code={code} reason={desc}");
} else {
trace!("close reason: code={code} no reason given");
}
} else {
trace!("close reason: none");
}
break;
}
}
}
}
}
}
info!("websocket disconnected '{describe}'");
let _ = session.close(None).await;
});
Ok(response)
}
async fn handle_binary(
bytes: Bytes,
session: &mut Session,
config: &Config,
pool: &MySqlPool,
clients_mutex: Data<Mutex<Clients>>,
) -> anyhow::Result<()> {
match decode_from_slice(&bytes, standard()) {
Err(e) => error!("unable to decode binary message: {e}"),
Ok((msg, _)) => match msg {
BartoCli::Info { json } => {
info!("received info message");
let pretty = Pretty::builder().env(vergen_pretty_env!());
let btbc: BartosToBartoCli = if json {
let new_pretty = pretty.flatten(true);
BartosToBartoCli::InfoJson(serde_json::to_string(&new_pretty.build())?)
} else {
let pretty_ext = PrettyExt::from(pretty.build());
BartosToBartoCli::Info(pretty_ext)
};
let encoded = encode_to_vec(&btbc, standard())?;
session.binary(encoded).await?;
}
BartoCli::Updates { name, kind } => {
let msg = match kind {
CliUpdateKind::Garuda => {
info!("received updates message for '{name}' (garuda)");
let updates = garuda_update_data(&name, config, pool).await?;
BartosToBartoCli::Updates(UpdateKind::Garuda(updates))
}
CliUpdateKind::Pacman => {
info!("received updates message for '{name}' (pacman)");
let updates = pacman_update_data(&name, config, pool).await?;
BartosToBartoCli::Updates(UpdateKind::Pacman(updates))
}
CliUpdateKind::Cachyos => {
info!("received updates message for '{name}' (cachyos)");
let updates = cachyos_update_data(&name, config, pool).await?;
BartosToBartoCli::Updates(UpdateKind::Cachyos(updates))
}
CliUpdateKind::Other => {
info!("received updates message for '{name}' (other)");
BartosToBartoCli::Updates(UpdateKind::Other)
}
};
let encoded = encode_to_vec(&msg, standard())?;
session.binary(encoded).await?;
}
BartoCli::Cleanup => {
info!("received cleanup message");
let counts = delete_data(config, pool).await?;
info!("deleted {} output rows", counts.0);
info!("deleted {} exit status rows", counts.1);
let cleanup = BartosToBartoCli::Cleanup(counts);
let encoded = encode_to_vec(&cleanup, standard())?;
session.binary(encoded).await?;
}
BartoCli::Clients => {
info!("received clients message");
let clients = clients_mutex.lock().await;
let mapped_clients = clients
.clients()
.iter()
.map(|c| (UuidWrapper(*c.0), c.1.clone()))
.collect::<HashMap<UuidWrapper, ClientData>>();
let clients = BartosToBartoCli::Clients(mapped_clients);
let encoded = encode_to_vec(&clients, standard())?;
session.binary(encoded).await?;
}
BartoCli::Query { query } => {
info!("received query message");
let results = sqlx::query(&query).fetch_all(pool).await?;
let mut map = BTreeMap::new();
for (i, row) in results.iter().enumerate() {
let mut row_map = BTreeMap::new();
for (j, column) in row.columns().iter().enumerate() {
if let Ok(value) = row.try_get::<u64, usize>(j) {
let _old = row_map.insert(column.name().to_string(), value.to_string());
} else if let Ok(value) = row.try_get::<OffsetDateTime, usize>(j) {
let value = value.to_offset(offset!(-4));
let _old = row_map.insert(column.name().to_string(), value.to_string());
} else if let Ok(value) = row.try_get::<String, usize>(j) {
let _old = row_map.insert(column.name().to_string(), value);
} else if let Ok(value) = row.try_get::<Uuid, usize>(j) {
let _old = row_map.insert(column.name().to_string(), value.to_string());
}
}
let _old = map.insert(i, row_map);
}
info!("query returned {} rows", map.len());
let query_result = BartosToBartoCli::Query(map);
let encoded = encode_to_vec(&query_result, standard())?;
session.binary(encoded).await?;
}
BartoCli::List { name, cmd_name } => {
info!("received list message for '{name}' (cmd: {cmd_name})");
let list_output = cmd_name_data(pool, &name, &cmd_name).await?;
let msg = BartosToBartoCli::List(list_output);
let encoded = encode_to_vec(&msg, standard())?;
session.binary(encoded).await?;
}
},
}
Ok(())
}
async fn garuda_update_data(
name: &str,
config: &Config,
pool: &MySqlPool,
) -> anyhow::Result<Vec<Garuda>> {
Ok(match config.mariadb().output_table() {
OutputTableName::Output => garuda_filter(output_data(name, pool).await?),
OutputTableName::OutputTest => garuda_filter(output_test_data(name, pool).await?),
})
}
async fn pacman_update_data(
name: &str,
config: &Config,
pool: &MySqlPool,
) -> anyhow::Result<Pacman> {
Ok(match config.mariadb().output_table() {
OutputTableName::Output => pacman_filter(&output_data(name, pool).await?),
OutputTableName::OutputTest => pacman_filter(&output_test_data(name, pool).await?),
})
}
async fn cachyos_update_data(
name: &str,
config: &Config,
pool: &MySqlPool,
) -> anyhow::Result<Pacman> {
Ok(match config.mariadb().output_table() {
OutputTableName::Output => cachyos_filter(&output_data(name, pool).await?),
OutputTableName::OutputTest => cachyos_filter(&output_test_data(name, pool).await?),
})
}
fn garuda_filter(data: Vec<String>) -> Vec<Garuda> {
let mut results = data
.into_iter()
.filter_map(|s| {
GARUDA_UPDATE_RE.captures(&s).map(|caps| {
Garuda::builder()
.channel(caps.get(1).map_or("", |m| m.as_str()))
.package(caps.get(2).map_or("", |m| m.as_str()))
.old_version(caps.get(3).map_or("", |m| m.as_str()))
.new_version(caps.get(4).map_or("", |m| m.as_str()))
.size_change(caps.get(5).map_or("", |m| m.as_str()))
.download_size(caps.get(6).map_or("", |m| m.as_str()))
.build()
})
})
.collect::<Vec<Garuda>>();
results.sort();
results
}
fn cachyos_filter(data: &[String]) -> Pacman {
let packages = data
.iter()
.filter_map(|s| {
CACHYOS_UPDATE_RE.captures(s).map(|caps| {
caps.get(2)
.map_or("", |m| m.as_str())
.split_whitespace()
.map(ToString::to_string)
.collect::<Vec<String>>()
})
})
.fold(vec![], |mut acc, packages| {
acc.extend(packages);
acc
});
let total_download_size = data
.iter()
.filter_map(|s| {
PACMAN_DOWNLOAD_SIZE_RE.captures(s).map(|caps| {
caps.get(1)
.map_or(0.0, |m| m.as_str().parse::<f64>().unwrap_or(0.0))
})
})
.sum::<f64>();
let total_install_size = data
.iter()
.filter_map(|s| {
PACMAN_INSTALL_SIZE_RE.captures(s).map(|caps| {
caps.get(1)
.map_or(0.0, |m| m.as_str().parse::<f64>().unwrap_or(0.0))
})
})
.sum::<f64>();
let net_install_size = data
.iter()
.filter_map(|s| {
NET_UPGRADE_SIZE_RE.captures(s).map(|caps| {
caps.get(1)
.map_or(0.0, |m| m.as_str().parse::<f64>().unwrap_or(0.0))
})
})
.sum::<f64>();
Pacman::builder()
.update_count(packages.len())
.packages(packages)
.install_size(total_install_size)
.net_size(net_install_size)
.download_size(total_download_size)
.build()
}
fn pacman_filter(data: &[String]) -> Pacman {
let (package_count, packages) = data
.iter()
.filter_map(|s| {
PACMAN_PACKAGES_RE.captures(s).map(|caps| {
(
caps.get(1)
.map_or(0, |m| m.as_str().parse::<usize>().unwrap_or(0)),
caps.get(2)
.map_or("", |m| m.as_str())
.split_whitespace()
.map(ToString::to_string)
.collect::<Vec<String>>(),
)
})
})
.fold((0, vec![]), |mut acc, (count, packages)| {
acc.0 += count;
acc.1.extend(packages);
acc
});
let total_download_size = data
.iter()
.filter_map(|s| {
PACMAN_DOWNLOAD_SIZE_RE.captures(s).map(|caps| {
caps.get(1)
.map_or(0.0, |m| m.as_str().parse::<f64>().unwrap_or(0.0))
})
})
.sum::<f64>();
let total_install_size = data
.iter()
.filter_map(|s| {
PACMAN_INSTALL_SIZE_RE.captures(s).map(|caps| {
caps.get(1)
.map_or(0.0, |m| m.as_str().parse::<f64>().unwrap_or(0.0))
})
})
.sum::<f64>();
let net_install_size = data
.iter()
.filter_map(|s| {
NET_UPGRADE_SIZE_RE.captures(s).map(|caps| {
caps.get(1)
.map_or(0.0, |m| m.as_str().parse::<f64>().unwrap_or(0.0))
})
})
.sum::<f64>();
Pacman::builder()
.update_count(package_count)
.packages(packages)
.install_size(total_install_size)
.net_size(net_install_size)
.download_size(total_download_size)
.build()
}
async fn output_data(name: &str, pool: &MySqlPool) -> anyhow::Result<Vec<String>> {
Ok(sqlx::query!(
r#"SELECT output.data FROM output WHERE output.bartoc_name = ? order by timestamp"#,
name,
)
.fetch_all(pool)
.await?
.into_iter()
.map(|r| r.data)
.collect::<Vec<String>>())
}
async fn output_test_data(name: &str, pool: &MySqlPool) -> anyhow::Result<Vec<String>> {
Ok(sqlx::query!(
r#"SELECT output_test.data FROM output_test WHERE output_test.bartoc_name = ? order by timestamp"#,
name,
)
.fetch_all(pool)
.await?
.into_iter()
.map(|r| r.data)
.collect::<Vec<String>>())
}
async fn delete_data(config: &Config, pool: &MySqlPool) -> anyhow::Result<(u64, u64)> {
match config.mariadb().output_table() {
OutputTableName::Output => delete_output_data(pool).await,
OutputTableName::OutputTest => delete_output_test_data(pool).await,
}
}
async fn delete_output_data(pool: &MySqlPool) -> anyhow::Result<(u64, u64)> {
let midnight = midnight()?;
let output_count = sqlx::query!("DELETE FROM output WHERE timestamp < ?", midnight)
.execute(pool)
.await?
.rows_affected();
let exit_status_count = sqlx::query!("DELETE FROM exit_status WHERE timestamp < ?", midnight)
.execute(pool)
.await?
.rows_affected();
Ok((output_count, exit_status_count))
}
async fn delete_output_test_data(pool: &MySqlPool) -> anyhow::Result<(u64, u64)> {
let midnight = midnight()?;
let output_count = sqlx::query!("DELETE FROM output_test WHERE timestamp < ?", midnight)
.execute(pool)
.await?
.rows_affected();
let exit_status_count =
sqlx::query!("DELETE FROM exit_status_test WHERE timestamp < ?", midnight)
.execute(pool)
.await?
.rows_affected();
Ok((output_count, exit_status_count))
}
fn midnight() -> anyhow::Result<OffsetDateTime> {
let now = OffsetDateTime::now_local()?;
let midnight = now.replace_time(time!(0:0:0));
info!("deleting records older than: {midnight}");
Ok(midnight)
}
async fn cmd_name_data(
pool: &MySqlPool,
name: &str,
cmd_name: &str,
) -> anyhow::Result<Vec<ListOutput>> {
let all_output = sqlx::query!(
"SELECT
output.timestamp,
output.cmd_name,
output.data,
exit_status.exit_code,
exit_status.success
FROM
output
RIGHT JOIN
exit_status ON exit_status.cmd_uuid = output.cmd_uuid
WHERE
output.bartoc_name = ?
AND
output.cmd_name = ?
ORDER BY
output.timestamp",
name,
cmd_name
)
.fetch_all(pool)
.await?
.into_iter()
.map(|r| {
ListOutput::builder()
.maybe_timestamp(r.timestamp.map(OffsetDataTimeWrapper))
.maybe_data(r.data)
.exit_code(r.exit_code)
.success(r.success)
.build()
})
.collect::<Vec<ListOutput>>();
Ok(all_output)
}
#[cfg(test)]
mod test {
use crate::endpoints::insecure::cli::GARUDA_UPDATE_RE;
use anyhow::Result;
const NO_MATCH: &str = "this is not a match";
#[test]
fn test_garuda_update_re_no_match() {
assert!(!GARUDA_UPDATE_RE.is_match(NO_MATCH));
}
#[test]
fn test_package_update_re() -> Result<()> {
let text = "extra/kio 6.19.0-1 6.19.0-2 0.00 MiB 3.59 MiB";
assert!(GARUDA_UPDATE_RE.is_match(text));
let caps = GARUDA_UPDATE_RE
.captures(text)
.ok_or(anyhow::anyhow!("failed to capture"))?;
assert_eq!(caps.get(1).map(|m| m.as_str()), Some("extra"));
assert_eq!(caps.get(2).map(|m| m.as_str()), Some("kio"));
assert_eq!(caps.get(3).map(|m| m.as_str()), Some("6.19.0-1"));
assert_eq!(caps.get(4).map(|m| m.as_str()), Some("6.19.0-2"));
assert_eq!(caps.get(5).map(|m| m.as_str()), Some("0.00 MiB"));
assert_eq!(caps.get(6).map(|m| m.as_str()), Some("3.59 MiB"));
Ok(())
}
}