use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use std::sync::atomic::Ordering;
#[cfg(not(target_arch = "wasm32"))]
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use hyphae::MapExt;
#[cfg(not(target_arch = "wasm32"))]
use hyphae::SwitchMapExt;
use hyphae::{Cell, CellImmutable};
#[cfg(not(target_arch = "wasm32"))]
use hyphae::{DedupedExt, interval};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use ts_rs::TS;
use crate::{
report::{ReportContext, ReportHandler},
wire::{MEvent, WrappedItem},
};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct ItemStub {
pub id: Arc<str>,
pub item_type: String,
pub name: Option<String>,
}
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct EntitySnapshotDifferenceData {
pub changed: Vec<ItemStub>,
pub added: Vec<ItemStub>,
pub removed: Vec<ItemStub>,
}
#[myko_macros::myko_report(Vec<Value>)]
pub struct GetItemsByTypeAndIds {
#[serde(rename = "type")]
pub item_type: String,
pub ids: Vec<Arc<str>>,
}
impl ReportHandler for GetItemsByTypeAndIds {
type Output = Vec<serde_json::Value>;
fn compute(&self, _ctx: ReportContext) -> Cell<Arc<Self::Output>, CellImmutable> {
Cell::new(Arc::new(Vec::new())).lock()
}
}
#[myko_macros::myko_report(Vec<ItemStub>)]
pub struct ChildEntities {
pub parent_type: String,
pub parent_id: Arc<str>,
}
impl ReportHandler for ChildEntities {
type Output = Vec<ItemStub>;
fn compute(&self, _ctx: ReportContext) -> Cell<Arc<Self::Output>, CellImmutable> {
Cell::new(Arc::new(Vec::new())).lock()
}
}
#[myko_macros::myko_report(Vec<ItemStub>)]
pub struct FullChildEntities {
pub parent_type: String,
pub parent_id: Arc<str>,
}
impl ReportHandler for FullChildEntities {
type Output = Vec<ItemStub>;
fn compute(&self, _ctx: ReportContext) -> Cell<Arc<Self::Output>, CellImmutable> {
Cell::new(Arc::new(Vec::new())).lock()
}
}
#[myko_macros::myko_report(Vec<ItemStub>)]
pub struct ChildEntitiesAllTime {
pub parent_type: String,
pub parent_id: Arc<str>,
}
impl ReportHandler for ChildEntitiesAllTime {
type Output = Vec<ItemStub>;
fn compute(&self, _ctx: ReportContext) -> Cell<Arc<Self::Output>, CellImmutable> {
Cell::new(Arc::new(Vec::new())).lock()
}
}
#[myko_macros::myko_report(EntitySnapshotDifferenceData)]
pub struct EntitySnapshotDifference {
pub parent_type: String,
pub parent_id: Arc<str>,
}
impl ReportHandler for EntitySnapshotDifference {
type Output = EntitySnapshotDifferenceData;
fn compute(&self, _ctx: ReportContext) -> Cell<Arc<Self::Output>, CellImmutable> {
Cell::new(Arc::new(EntitySnapshotDifferenceData::default())).lock()
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, TS, PartialEq, Eq)]
#[serde(rename_all = "UPPERCASE")]
#[ts(export)]
pub enum LogLevel {
Error,
Warn,
#[default]
Info,
Debug,
Verbose,
}
#[myko_macros::myko_report(Vec<String>)]
pub struct Loggers {}
impl ReportHandler for Loggers {
type Output = Vec<String>;
fn compute(&self, _ctx: ReportContext) -> Cell<Arc<Self::Output>, CellImmutable> {
Cell::new(Arc::new(vec![
"myko".to_string(),
"myko::server".to_string(),
"myko::query".to_string(),
"myko::command".to_string(),
"myko::report".to_string(),
]))
.lock()
}
}
#[myko_macros::myko_report(LogLevel)]
pub struct ServerLogLevel {
pub server_id: Arc<str>,
}
impl ReportHandler for ServerLogLevel {
type Output = LogLevel;
fn compute(&self, _ctx: ReportContext) -> Cell<Arc<Self::Output>, CellImmutable> {
Cell::new(Arc::new(LogLevel::Info)).lock()
}
}
#[myko_macros::myko_command(bool)]
pub struct SetLogLevel {
pub server_id: Arc<str>,
pub level: LogLevel,
}
impl crate::command::CommandHandler for SetLogLevel {
fn execute(
self,
_ctx: crate::command::CommandContext,
) -> Result<bool, crate::command::CommandError> {
Ok(true)
}
}
#[myko_macros::myko_report(i64)]
pub struct PeerAlive {
pub peer_id: Arc<str>,
}
#[cfg(not(target_arch = "wasm32"))]
impl ReportHandler for PeerAlive {
type Output = i64;
fn compute(&self, ctx: ReportContext) -> Cell<Arc<Self::Output>, CellImmutable> {
let peer_id = self.peer_id.clone();
let report_ctx = ctx.clone();
ctx.peer_clients_tick().switch_map(move |_| {
let Some(peer_client) = report_ctx.peer_client(peer_id.as_ref()) else {
return Cell::new(Arc::new(-1)).lock();
};
peer_client.ping_ms().map(|ping_ms| {
Arc::new(
ping_ms
.map(|ms| ms.min(i64::MAX as u64) as i64)
.unwrap_or(-1),
)
})
})
}
}
#[cfg(target_arch = "wasm32")]
impl ReportHandler for PeerAlive {
type Output = i64;
fn compute(&self, _ctx: ReportContext) -> Cell<Arc<Self::Output>, CellImmutable> {
Cell::new(Arc::new(-1)).lock()
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct EventContainer {
pub id: Arc<str>,
pub event: crate::wire::MEvent,
}
#[myko_macros::myko_report(Vec<MEvent>)]
pub struct EventsForTransaction {
pub transaction_id: String,
}
impl ReportHandler for EventsForTransaction {
type Output = Vec<crate::wire::MEvent>;
fn compute(&self, _ctx: ReportContext) -> Cell<Arc<Self::Output>, CellImmutable> {
Cell::new(Arc::new(Vec::new())).lock()
}
}
#[myko_macros::myko_command(usize)]
pub struct ImportItems {
pub items: Vec<WrappedItem>,
#[serde(default)]
pub delete_items: Vec<WrappedItem>,
}
impl crate::command::CommandHandler for ImportItems {
fn execute(
self,
ctx: crate::command::CommandContext,
) -> Result<usize, crate::command::CommandError> {
use crate::wire::{MEvent, MEventType};
let tx = ctx.tx().to_string();
let source_id = Some(ctx.host_id().to_string());
let created_at = ctx.created_at().to_string();
let mut events = Vec::with_capacity(self.items.len() + self.delete_items.len());
for wrapped in &self.items {
events.push(MEvent {
item: wrapped.item.clone(),
change_type: MEventType::SET,
item_type: wrapped.item_type.to_string(),
created_at: created_at.clone(),
tx: tx.clone(),
source_id: source_id.clone(),
options: None,
});
}
for wrapped in &self.delete_items {
events.push(MEvent {
item: wrapped.item.clone(),
change_type: MEventType::DEL,
item_type: wrapped.item_type.to_string(),
created_at: created_at.clone(),
tx: tx.clone(),
source_id: source_id.clone(),
options: None,
});
}
ctx.emit_event_batch(events)
}
}
#[myko_macros::myko_report_output]
pub struct PersistHealthStatus {
pub queued: u64,
pub total_persisted: u64,
pub total_errors: u64,
pub consecutive_errors: u64,
#[serde(skip_serializing_if = "Option::is_none")]
#[ts(optional = nullable)]
pub last_error: Option<String>,
pub healthy: bool,
pub writes_per_second: f64,
}
#[myko_macros::myko_report(PersistHealthStatus)]
pub struct GetPersistHealth {}
#[cfg(not(target_arch = "wasm32"))]
impl ReportHandler for GetPersistHealth {
type Output = PersistHealthStatus;
fn compute(&self, ctx: ReportContext) -> Cell<Arc<Self::Output>, CellImmutable> {
let health = ctx.persist_health();
interval(Duration::from_millis(500))
.map(move |_tick| {
let queued = health.queued.load(Ordering::Relaxed);
let total_persisted = health.total_persisted.load(Ordering::Relaxed);
let total_errors = health.total_errors.load(Ordering::Relaxed);
let consecutive_errors = health.consecutive_errors.load(Ordering::Relaxed);
let last_error = health.last_error.read().unwrap().clone();
let writes_per_second = health.writes_per_second();
Arc::new(PersistHealthStatus {
queued,
total_persisted,
total_errors,
consecutive_errors,
last_error,
healthy: consecutive_errors == 0,
writes_per_second,
})
})
.deduped()
}
}
#[cfg(target_arch = "wasm32")]
impl ReportHandler for GetPersistHealth {
type Output = PersistHealthStatus;
fn compute(&self, _ctx: ReportContext) -> Cell<Arc<Self::Output>, CellImmutable> {
Cell::new(Arc::new(PersistHealthStatus {
queued: 0,
total_persisted: 0,
total_errors: 0,
consecutive_errors: 0,
last_error: None,
healthy: true,
writes_per_second: 0.0,
}))
.lock()
}
}
crate::register_ts_export!(
ItemStub,
EntitySnapshotDifferenceData,
LogLevel,
EventContainer,
PersistHealthStatus
);