use crate::config::{DatabaseDriver, DatabaseSinkConfig, FileSinkConfig};
use crate::error::InklogError;
use crate::log_record::LogRecord;
use crate::sink::file::FileSink;
use crate::sink::{CircuitBreaker, LogSink};
use chrono::Utc;
use sea_orm::entity::prelude::*;
use sea_orm::{
ConnectOptions, ConnectionTrait, Database, DatabaseConnection, EntityTrait, QueryFilter,
QuerySelect, Schema, Set, Statement,
};
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::runtime::Runtime;
use chrono::{Datelike, Timelike};
use serde::Serialize;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize)]
#[sea_orm(table_name = "logs")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i64,
pub timestamp: DateTimeUtc,
pub level: String,
pub target: String,
#[sea_orm(column_type = "Text")]
pub message: String,
#[sea_orm(column_type = "Json", nullable)]
pub fields: Option<serde_json::Value>,
pub file: Option<String>,
pub line: Option<i32>,
pub thread_id: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
mod archive_metadata {
use sea_orm::entity::prelude::*;
use serde::Serialize;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize)]
#[sea_orm(table_name = "archive_metadata")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i64,
pub archive_date: DateTimeUtc,
pub s3_key: String,
pub record_count: i64,
pub file_size: i64,
pub status: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
}
use archive_metadata::ActiveModel as ArchiveMetadataActiveModel;
use archive_metadata::Entity as ArchiveMetadataEntity;
fn validate_table_name(name: &str) -> Result<String, InklogError> {
if name.is_empty() {
return Err(InklogError::DatabaseError(
"Table name cannot be empty".to_string(),
));
}
if name.len() > 128 {
return Err(InklogError::DatabaseError(
"Table name too long".to_string(),
));
}
let first_char = name
.chars()
.next()
.ok_or_else(|| InklogError::DatabaseError("Table name is empty".to_string()))?;
if !first_char.is_ascii_alphabetic() && first_char != '_' {
return Err(InklogError::DatabaseError(format!(
"Table name must start with letter or underscore, got: {}",
first_char
)));
}
if !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
return Err(InklogError::DatabaseError(format!(
"Table name contains invalid characters: {}",
name
)));
}
Ok(name.to_string())
}
fn validate_partition_name(partition_name: &str) -> Result<String, InklogError> {
if !partition_name.starts_with("logs_") {
return Err(InklogError::DatabaseError(format!(
"Partition name must start with 'logs_', got: {}",
partition_name
)));
}
let date_part = &partition_name[5..]; if date_part.len() != 7 || date_part.chars().nth(4) != Some('_') {
return Err(InklogError::DatabaseError(format!(
"Invalid partition date format, expected YYYY_MM, got: {}",
date_part
)));
}
let year = &date_part[..4];
let month = &date_part[5..];
if !year.chars().all(|c| c.is_ascii_digit()) || year.parse::<u32>().is_err() {
return Err(InklogError::DatabaseError(format!(
"Invalid year in partition name: {}",
year
)));
}
if !month.chars().all(|c| c.is_ascii_digit()) || month.parse::<u32>().is_err() {
return Err(InklogError::DatabaseError(format!(
"Invalid month in partition name: {}",
month
)));
}
let month_num: u32 = month.parse().unwrap();
if month_num == 0 || month_num > 12 {
return Err(InklogError::DatabaseError(format!(
"Invalid month value in partition name: {}",
month_num
)));
}
Ok(partition_name.to_string())
}
fn validate_date_format(date_str: &str) -> Result<(), InklogError> {
if date_str.len() != 10 {
return Err(InklogError::DatabaseError(
"Date must be in YYYY-MM-DD format".to_string(),
));
}
if &date_str[4..5] != "-" || &date_str[7..8] != "-" {
return Err(InklogError::DatabaseError(
"Date must be in YYYY-MM-DD format with hyphens".to_string(),
));
}
let year = &date_str[0..4];
if !year.chars().all(|c| c.is_ascii_digit()) {
return Err(InklogError::DatabaseError(
"Year must be numeric".to_string(),
));
}
let month = &date_str[5..7];
if !month.chars().all(|c| c.is_ascii_digit()) {
return Err(InklogError::DatabaseError(
"Month must be numeric".to_string(),
));
}
let month_num: u32 = month.parse().unwrap_or(0);
if !(1..=12).contains(&month_num) {
return Err(InklogError::DatabaseError(format!(
"Invalid month: {}",
month_num
)));
}
let day = &date_str[8..10];
if !day.chars().all(|c| c.is_ascii_digit()) {
return Err(InklogError::DatabaseError(
"Day must be numeric".to_string(),
));
}
chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d")
.map_err(|_| InklogError::DatabaseError(format!("Invalid date: {}", date_str)))?;
Ok(())
}
pub struct DatabaseSink {
config: DatabaseSinkConfig,
buffer: Vec<LogRecord>,
last_flush: Instant,
last_archive_check: chrono::DateTime<chrono::Utc>,
last_partition_check: chrono::DateTime<chrono::Utc>,
rt: Runtime,
db: Option<DatabaseConnection>,
fallback_sink: Option<FileSink>,
circuit_breaker: CircuitBreaker,
}
impl DatabaseSink {
pub fn new(config: DatabaseSinkConfig) -> Result<Self, InklogError> {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(std::cmp::max(2, num_cpus::get()))
.thread_name("inklog-db-worker")
.enable_all()
.build()
.map_err(InklogError::IoError)?;
let fallback_config = FileSinkConfig {
enabled: true,
path: PathBuf::from("logs/db_fallback.log"),
..Default::default()
};
let fallback_sink = FileSink::new(fallback_config).ok();
let mut sink = Self {
config: config.clone(),
buffer: Vec::with_capacity(config.batch_size),
last_flush: Instant::now(),
last_archive_check: Utc::now(),
last_partition_check: Utc::now() - chrono::Duration::days(1),
rt,
db: None,
fallback_sink,
circuit_breaker: CircuitBreaker::new(5, Duration::from_secs(30)),
};
let _ = sink.init_db(); Ok(sink)
}
fn init_db(&mut self) -> Result<(), InklogError> {
let url = self.config.url.clone();
let pool_size = self.config.pool_size;
let db = self
.rt
.block_on(async {
let mut opt = ConnectOptions::new(url);
opt.max_connections(pool_size)
.min_connections(2)
.connect_timeout(Duration::from_secs(5))
.idle_timeout(Duration::from_secs(8));
Database::connect(opt).await
})
.map_err(|e| InklogError::DatabaseError(e.to_string()))?;
self.rt
.block_on(async {
let builder = db.get_database_backend();
let schema = Schema::new(builder);
match self.config.driver {
DatabaseDriver::PostgreSQL => {
let stmt =
builder.build(schema.create_table_from_entity(Entity).if_not_exists());
db.execute_unprepared(&stmt.sql)
.await
.map_err(|e| InklogError::DatabaseError(e.to_string()))?;
}
DatabaseDriver::MySQL => {
let create_table_sql = r#"
CREATE TABLE IF NOT EXISTS `logs` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY,
`timestamp` DATETIME(3) NOT NULL,
`level` VARCHAR(20) NOT NULL,
`target` VARCHAR(255) NOT NULL,
`message` TEXT NOT NULL,
`fields` JSON,
`file` VARCHAR(512),
`line` INT,
`thread_id` VARCHAR(100) NOT NULL,
INDEX `idx_timestamp` (`timestamp`),
INDEX `idx_level` (`level`),
INDEX `idx_target` (`target`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
"#;
let stmt = Statement::from_string(
sea_orm::DatabaseBackend::MySql,
create_table_sql,
);
db.execute_unprepared(&stmt.sql)
.await
.map_err(|e| InklogError::DatabaseError(e.to_string()))?;
}
DatabaseDriver::SQLite => {
let create_table_sql = r#"
CREATE TABLE IF NOT EXISTS "logs" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"timestamp" TEXT NOT NULL,
"level" TEXT NOT NULL,
"target" TEXT NOT NULL,
"message" TEXT NOT NULL,
"fields" TEXT,
"file" TEXT,
"line" INTEGER,
"thread_id" TEXT NOT NULL
)
"#;
let stmt = Statement::from_string(
sea_orm::DatabaseBackend::Sqlite,
create_table_sql,
);
db.execute_unprepared(&stmt.sql)
.await
.map_err(|e| InklogError::DatabaseError(e.to_string()))?;
let create_index_sql = r#"
CREATE INDEX IF NOT EXISTS "idx_logs_timestamp" ON "logs" ("timestamp")
"#;
let stmt_index = Statement::from_string(
sea_orm::DatabaseBackend::Sqlite,
create_index_sql,
);
db.execute_unprepared(&stmt_index.sql)
.await
.map_err(|e| InklogError::DatabaseError(e.to_string()))?;
}
}
let stmt_archive = builder.build(
schema
.create_table_from_entity(ArchiveMetadataEntity)
.if_not_exists(),
);
db.execute_unprepared(&stmt_archive.sql)
.await
.map_err(|e| InklogError::DatabaseError(e.to_string()))?;
Ok::<(), InklogError>(())
})
.map_err(|e| InklogError::DatabaseError(e.to_string()))?;
self.db = Some(db);
Ok(())
}
fn flush_buffer(&mut self) -> Result<(), InklogError> {
if self.buffer.is_empty() {
return Ok(());
}
let current_batch_size =
if self.circuit_breaker.state() == crate::sink::CircuitState::HalfOpen {
self.config.batch_size / 2
} else {
self.config.batch_size
};
if self.buffer.len() < current_batch_size
&& self.last_flush.elapsed() < Duration::from_millis(self.config.flush_interval_ms)
{
return Ok(());
}
if !self.circuit_breaker.can_execute() {
self.fallback_to_file()?;
self.buffer.clear();
self.last_flush = Instant::now();
return Ok(());
}
let now = Utc::now();
let should_check_partition = now.date_naive() != self.last_partition_check.date_naive();
if should_check_partition {
self.last_partition_check = now;
}
let mysql_partition_valid = match self.config.driver {
DatabaseDriver::MySQL => {
if should_check_partition {
let partition_name = format!("logs_{}", now.format("%Y_%m"));
partition_name
.chars()
.all(|c| c.is_alphanumeric() || c == '_')
} else {
true
}
}
_ => true,
};
let mut success = false;
if let Some(db) = &self.db {
let logs: Vec<ActiveModel> = self
.buffer
.drain(..)
.map(|r| ActiveModel {
timestamp: Set(r.timestamp),
level: Set(r.level),
target: Set(r.target),
message: Set(r.message),
fields: Set(Some(
serde_json::to_value(&r.fields).unwrap_or(serde_json::Value::Null),
)),
file: Set(r.file),
line: Set(r.line.map(|l| l as i32)),
thread_id: Set(r.thread_id),
..Default::default()
})
.collect();
let res = self.rt.block_on(async {
match self.config.driver {
DatabaseDriver::PostgreSQL => {
if should_check_partition {
let partition_name = format!("logs_{}", now.format("%Y_%m"));
let validated_partition = match validate_partition_name(&partition_name) {
Ok(name) => name,
Err(e) => {
tracing::error!("Partition name validation failed: {}", e);
return Err(sea_orm::DbErr::Query(
sea_orm::RuntimeErr::Internal(e.to_string())
));
}
};
let start_date = now.format("%Y-%m-01").to_string();
let next_month = if now.month() == 12 {
format!("{}-01-01", now.year() + 1)
} else {
format!("{}-{:02}-01", now.year(), now.month() + 1)
};
let validated_table = match validate_table_name(&self.config.table_name) {
Ok(name) => name,
Err(e) => {
tracing::error!("Table name validation failed: {}", e);
return Err(sea_orm::DbErr::Query(
sea_orm::RuntimeErr::Internal(e.to_string())
));
}
};
let quoted_table = format!("\"{}\"", validated_table);
let quoted_partition = format!("\"{}\"", validated_partition);
if let Err(e) = validate_date_format(&start_date) {
return Err(sea_orm::DbErr::Query(
sea_orm::RuntimeErr::Internal(e.to_string())
));
}
if let Err(e) = validate_date_format(&next_month) {
return Err(sea_orm::DbErr::Query(
sea_orm::RuntimeErr::Internal(e.to_string())
));
}
let sql = format!(
"CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES FROM ('{}') TO ('{}')",
quoted_partition, quoted_table, start_date, next_month
);
let stmt = Statement::from_string(db.get_database_backend(), sql);
let _ = db.execute_unprepared(&stmt.sql).await;
}
}
DatabaseDriver::MySQL => {
if should_check_partition {
let partition_name = format!("logs_{}", now.format("%Y_%m"));
let start_date = now.format("%Y-%m-01").to_string();
if !mysql_partition_valid {
tracing::error!("Invalid partition name: {}", partition_name);
self.circuit_breaker.record_failure();
success = false;
} else {
let validated_partition = validate_partition_name(&partition_name)
.unwrap_or_else(|_| {
tracing::error!("Invalid partition name: {}", partition_name);
partition_name.clone()
});
if let Err(e) = validate_date_format(&start_date) {
return Err(sea_orm::DbErr::Query(
sea_orm::RuntimeErr::Internal(e.to_string())
));
}
let partition_sql = format!(
"CREATE TABLE IF NOT EXISTS `{}` PARTITION OF `logs` FOR VALUES IN (TO_DAYS('{}'))",
validated_partition,
start_date
);
let stmt = Statement::from_string(sea_orm::DatabaseBackend::MySql, partition_sql);
let _ = db.execute_unprepared(&stmt.sql).await;
}
}
}
DatabaseDriver::SQLite => {}
}
Entity::insert_many(logs).exec(db).await
});
match res {
Ok(_) => {
self.circuit_breaker.record_success();
success = true;
}
Err(e) => {
tracing::error!(error = %e, "Database insert failed");
self.circuit_breaker.record_failure();
let _ = self.init_db();
}
}
}
if !success {
self.fallback_to_file()?;
}
self.buffer.clear();
self.last_flush = Instant::now();
Ok(())
}
fn fallback_to_file(&mut self) -> Result<(), InklogError> {
if let Some(sink) = &mut self.fallback_sink {
for record in &self.buffer {
let _ = sink.write(record);
}
}
Ok(())
}
}
impl LogSink for DatabaseSink {
fn write(&mut self, record: &LogRecord) -> Result<(), InklogError> {
self.buffer.push(record.clone());
if self.buffer.len() >= self.config.batch_size
|| self.last_flush.elapsed() >= Duration::from_millis(self.config.flush_interval_ms)
{
if let Err(e) = self.flush_buffer() {
tracing::error!(error = ?e, "Failed to flush database buffer");
}
}
if self.config.archive_to_s3 {
let now = Utc::now();
if now.hour() == 2 && self.last_archive_check.date_naive() != now.date_naive() {
self.last_archive_check = now;
let db_opt = self.db.clone();
let config = self.config.clone();
if let Some(db) = db_opt {
let res = self.rt.block_on(async move {
let days = config.archive_after_days as i64;
let cutoff = Utc::now() - chrono::Duration::days(days);
let logs = Entity::find()
.filter(Column::Timestamp.lt(cutoff))
.limit(1000)
.all(&db)
.await
.map_err(|e| InklogError::DatabaseError(e.to_string()))?;
if logs.is_empty() {
return Ok(());
}
let parquet_data = convert_logs_to_parquet(&logs, &config.parquet_config).map_err(|e| {
InklogError::SerializationError(serde_json::Error::io(
std::io::Error::other(e.to_string()),
))
})?;
let file_size = parquet_data.len() as i64;
#[cfg(feature = "aws")]
{
if let (Some(bucket), Some(region)) =
(&config.s3_bucket, &config.s3_region)
{
let aws_config = aws_config::from_env()
.region(aws_types::region::Region::new(region.clone()))
.load()
.await;
let client = aws_sdk_s3::Client::new(&aws_config);
let key = format!(
"{}/{}/logs_{}.parquet",
Utc::now().format("%Y"),
Utc::now().format("%m"),
Utc::now().format("%d_%H%M%S")
);
client
.put_object()
.bucket(bucket)
.key(&key)
.body(parquet_data.into())
.storage_class(aws_sdk_s3::types::StorageClass::Glacier)
.send()
.await
.map_err(|e| InklogError::S3Error(e.to_string()))?;
let meta = ArchiveMetadataActiveModel {
archive_date: Set(Utc::now()),
s3_key: Set(key),
record_count: Set(logs.len() as i64),
file_size: Set(file_size),
status: Set("SUCCESS".to_string()),
..Default::default()
};
ArchiveMetadataEntity::insert(meta)
.exec(&db)
.await
.map_err(|e| InklogError::DatabaseError(e.to_string()))?;
let ids: Vec<i64> = logs.iter().map(|l| l.id).collect();
Entity::delete_many()
.filter(Column::Id.is_in(ids))
.exec(&db)
.await
.map_err(|e| InklogError::DatabaseError(e.to_string()))?;
}
}
#[cfg(not(feature = "aws"))]
{
let archive_dir = std::path::Path::new("logs/archive");
if let Err(e) = std::fs::create_dir_all(archive_dir) {
tracing::error!(error = %e, "Failed to create archive directory");
} else {
let filename =
format!("logs_{}.parquet", Utc::now().format("%Y%m%d_%H%M%S"));
let filepath = archive_dir.join(&filename);
if let Err(e) = std::fs::write(&filepath, &parquet_data) {
tracing::error!(error = %e, "Failed to write archive file");
} else {
let meta = ArchiveMetadataActiveModel {
archive_date: Set(Utc::now()),
s3_key: Set(format!("local/{}", filename)),
record_count: Set(logs.len() as i64),
file_size: Set(file_size),
status: Set("LOCAL_SUCCESS".to_string()),
..Default::default()
};
if let Err(e) = ArchiveMetadataEntity::insert(meta)
.exec(&db)
.await
.map_err(|e| InklogError::DatabaseError(e.to_string()))
{
tracing::error!(error = %e, "Failed to insert archive metadata");
}
let ids: Vec<i64> = logs.iter().map(|l| l.id).collect();
if let Err(e) = Entity::delete_many()
.filter(Column::Id.is_in(ids))
.exec(&db)
.await
.map_err(|e| InklogError::DatabaseError(e.to_string()))
{
tracing::error!(error = %e, "Failed to delete archived logs");
}
}
}
}
Ok::<(), InklogError>(())
});
if let Err(e) = res {
tracing::error!(error = %e, "Archive operation failed");
}
}
}
}
Ok(())
}
fn flush(&mut self) -> Result<(), InklogError> {
self.flush_buffer()
}
fn is_healthy(&self) -> bool {
self.db.is_some()
}
fn shutdown(&mut self) -> Result<(), InklogError> {
self.flush_buffer()?;
if let Some(db) = self.db.take() {
self.rt.block_on(async move {
let _ = db.close().await;
});
}
Ok(())
}
}
pub fn convert_logs_to_parquet(
logs: &[Model],
config: &crate::config::ParquetConfig,
) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, Encoding};
use parquet::file::properties::WriterProperties;
use std::io::Cursor;
use std::sync::Arc;
let encoding = match config.encoding.to_uppercase().as_str() {
"DICTIONARY" => Encoding::RLE_DICTIONARY,
"RLE" => Encoding::RLE,
_ => Encoding::PLAIN,
};
let compression = Compression::ZSTD(Default::default());
let writer_props = WriterProperties::builder()
.set_compression(compression)
.set_encoding(encoding)
.set_max_row_group_size(config.max_row_group_size)
.build();
let include_all = config.include_fields.is_empty();
let include_fields: std::collections::HashSet<String> =
config.include_fields.iter().cloned().collect();
let mut fields = Vec::new();
let mut arrays: Vec<ArrayRef> = Vec::new();
if include_all || include_fields.contains("id") {
let mut id_builder = Vec::with_capacity(logs.len());
for log in logs {
id_builder.push(log.id);
}
fields.push(Field::new("id", DataType::Int64, false));
arrays.push(Arc::new(Int64Array::from(id_builder)) as ArrayRef);
}
if include_all || include_fields.contains("timestamp") {
let mut timestamp_builder = Vec::with_capacity(logs.len());
for log in logs {
timestamp_builder.push(log.timestamp.to_rfc3339());
}
fields.push(Field::new("timestamp", DataType::Utf8, false));
arrays.push(Arc::new(StringArray::from(timestamp_builder)) as ArrayRef);
}
if include_all || include_fields.contains("level") {
let mut level_builder = Vec::with_capacity(logs.len());
for log in logs {
level_builder.push(log.level.clone());
}
fields.push(Field::new("level", DataType::Utf8, false));
arrays.push(Arc::new(StringArray::from(level_builder)) as ArrayRef);
}
if include_all || include_fields.contains("target") {
let mut target_builder = Vec::with_capacity(logs.len());
for log in logs {
target_builder.push(log.target.clone());
}
fields.push(Field::new("target", DataType::Utf8, false));
arrays.push(Arc::new(StringArray::from(target_builder)) as ArrayRef);
}
if include_all || include_fields.contains("message") {
let mut message_builder = Vec::with_capacity(logs.len());
for log in logs {
message_builder.push(log.message.clone());
}
fields.push(Field::new("message", DataType::Utf8, false));
arrays.push(Arc::new(StringArray::from(message_builder)) as ArrayRef);
}
if include_all || include_fields.contains("fields") {
let mut fields_builder = Vec::with_capacity(logs.len());
for log in logs {
fields_builder.push(serde_json::to_string(&log.fields).ok());
}
fields.push(Field::new("fields", DataType::Utf8, true));
arrays.push(Arc::new(StringArray::from(fields_builder)) as ArrayRef);
}
if include_all || include_fields.contains("file") {
let mut file_builder = Vec::with_capacity(logs.len());
for log in logs {
file_builder.push(log.file.clone());
}
fields.push(Field::new("file", DataType::Utf8, true));
arrays.push(Arc::new(StringArray::from(file_builder)) as ArrayRef);
}
if include_all || include_fields.contains("line") {
let mut line_builder = Vec::with_capacity(logs.len());
for log in logs {
line_builder.push(log.line.map(|l| l as i64));
}
fields.push(Field::new("line", DataType::Int64, true));
arrays.push(Arc::new(Int64Array::from(line_builder)) as ArrayRef);
}
if include_all || include_fields.contains("thread_id") {
let mut thread_id_builder = Vec::with_capacity(logs.len());
for log in logs {
thread_id_builder.push(log.thread_id.clone());
}
fields.push(Field::new("thread_id", DataType::Utf8, false));
arrays.push(Arc::new(StringArray::from(thread_id_builder)) as ArrayRef);
}
let schema = Arc::new(Schema::new(fields));
let batch = RecordBatch::try_new(schema.clone(), arrays)?;
let mut buffer = Vec::new();
let cursor = Cursor::new(&mut buffer);
let mut writer = ArrowWriter::try_new(cursor, schema, Some(writer_props))?;
writer.write(&batch)?;
writer.close()?;
Ok(buffer)
}