use crate::storage::models::*;
use crate::storage::{StorageError, StorageResult};
use sqlx::PgPool;
pub struct SimpleRelationalStore {
pool: PgPool,
}
impl SimpleRelationalStore {
pub async fn new(database_url: &str) -> StorageResult<Self> {
let pool = PgPool::connect(database_url)
.await
.map_err(|e| StorageError::Connection(e.to_string()))?;
Ok(Self { pool })
}
pub async fn initialize(&self) -> StorageResult<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS events (
id BIGSERIAL PRIMARY KEY,
event_id TEXT NOT NULL,
event_type TEXT NOT NULL,
chat_id TEXT NOT NULL,
user_id TEXT,
timestamp TIMESTAMPTZ NOT NULL,
raw_payload JSONB NOT NULL,
processed_data JSONB
)
"#,
)
.execute(&self.pool)
.await
.map_err(|e| StorageError::Connection(e.to_string()))?;
Ok(())
}
pub async fn health_check(&self) -> StorageResult<()> {
sqlx::query("SELECT 1")
.execute(&self.pool)
.await
.map_err(|e| StorageError::Connection(e.to_string()))?;
Ok(())
}
pub async fn get_stats(&self, _chat_id: Option<&str>) -> StorageResult<EventStats> {
Ok(EventStats {
total_events: 0,
total_messages: 0,
unique_chats: 0,
unique_users: 0,
events_last_24h: 0,
events_last_week: 0,
oldest_event: None,
newest_event: None,
storage_size_bytes: Some(0),
})
}
pub async fn cleanup_old_data(&self, _older_than_days: u32) -> StorageResult<i64> {
Ok(0)
}
pub async fn search_messages(
&self,
_query: &str,
_chat_id: Option<&str>,
_limit: i64,
) -> StorageResult<Vec<Message>> {
Ok(vec![])
}
pub async fn search_events_advanced(
&self,
_user_id: Option<&str>,
_event_type: Option<&str>,
_since: Option<chrono::DateTime<chrono::Utc>>,
_until: Option<chrono::DateTime<chrono::Utc>>,
_limit: i64,
) -> StorageResult<Vec<Event>> {
Ok(vec![])
}
pub async fn get_recent_events(
&self,
_chat_id: Option<&str>,
_event_type: Option<&str>,
_limit: i64,
) -> StorageResult<Vec<Event>> {
Ok(vec![])
}
pub async fn migrate(&self) -> StorageResult<()> {
Ok(())
}
pub async fn insert_event(&self, _new_event: NewEvent) -> StorageResult<i64> {
Ok(1)
}
pub async fn insert_message(&self, _new_message: NewMessage) -> StorageResult<i64> {
Ok(1)
}
pub async fn store_event(&self, new_event: NewEvent) -> StorageResult<i64> {
self.insert_event(new_event).await
}
pub async fn store_message(&self, new_message: NewMessage) -> StorageResult<i64> {
self.insert_message(new_message).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
fn create_test_event() -> NewEvent {
NewEvent {
event_id: "test_event_123".to_string(),
event_type: "newMessage".to_string(),
chat_id: "test_chat".to_string(),
user_id: Some("test_user".to_string()),
timestamp: Utc::now(),
raw_payload: serde_json::json!({"test": "data"}),
processed_data: Some(serde_json::json!({"processed": true})),
}
}
fn create_test_message() -> NewMessage {
NewMessage {
event_id: 1,
message_id: "msg_123".to_string(),
chat_id: "test_chat".to_string(),
user_id: "test_user".to_string(),
text: Some("Test message content".to_string()),
formatted_text: None,
reply_to_message_id: None,
forward_from_chat_id: None,
forward_from_message_id: None,
file_attachments: Some(serde_json::json!({"original": "message"})),
has_mentions: false,
mentions: Some(serde_json::json!({"meta": "data"})),
timestamp: Utc::now(),
}
}
#[tokio::test]
async fn test_simple_store_creation_invalid_url() {
let result = SimpleRelationalStore::new("invalid://database/url").await;
assert!(result.is_err());
if let Err(StorageError::Connection(msg)) = result {
assert!(!msg.is_empty());
} else {
panic!("Expected Connection error");
}
}
#[tokio::test]
async fn test_simple_store_methods_interface() {
let mock_pool = sqlx::PgPool::connect("postgres://fake:fake@localhost:5432/fake").await;
match mock_pool {
Ok(pool) => {
let store = SimpleRelationalStore { pool };
let _health_result = store.health_check().await;
let _stats_result = store.get_stats(None).await;
let _cleanup_result = store.cleanup_old_data(30).await;
let _search_result = store.search_messages("test", None, 10).await;
let _events_result = store.get_recent_events(None, None, 10).await;
let _migrate_result = store.migrate().await;
}
Err(_) => {
}
}
}
#[test]
fn test_create_test_event() {
let event = create_test_event();
assert_eq!(event.event_id, "test_event_123");
assert_eq!(event.chat_id, "test_chat");
assert_eq!(event.user_id, Some("test_user".to_string()));
assert_eq!(event.event_type, "newMessage");
assert_eq!(event.raw_payload["test"], "data");
assert_eq!(event.processed_data.unwrap()["processed"], true);
}
#[test]
fn test_create_test_message() {
let message = create_test_message();
assert_eq!(message.message_id, "msg_123");
assert_eq!(message.chat_id, "test_chat");
assert_eq!(message.user_id, "test_user");
assert_eq!(message.text, Some("Test message content".to_string()));
assert_eq!(message.file_attachments.unwrap()["original"], "message");
assert_eq!(message.mentions.unwrap()["meta"], "data");
}
#[test]
fn test_event_stats_default() {
let stats = EventStats {
total_events: 100,
total_messages: 80,
unique_chats: 5,
unique_users: 20,
events_last_24h: 10,
events_last_week: 50,
oldest_event: None,
newest_event: None,
storage_size_bytes: Some(1024),
};
assert_eq!(stats.total_events, 100);
assert_eq!(stats.total_messages, 80);
assert_eq!(stats.unique_chats, 5);
assert_eq!(stats.unique_users, 20);
assert_eq!(stats.events_last_24h, 10);
assert_eq!(stats.events_last_week, 50);
assert!(stats.oldest_event.is_none());
assert!(stats.newest_event.is_none());
assert_eq!(stats.storage_size_bytes, Some(1024));
}
#[tokio::test]
async fn test_dummy_implementations() {
let store =
if let Ok(pool) = sqlx::PgPool::connect("postgres://fake:fake@localhost/fake").await {
SimpleRelationalStore { pool }
} else {
return;
};
if let Ok(stats) = store.get_stats(Some("test_chat")).await {
assert_eq!(stats.total_events, 0);
assert_eq!(stats.total_messages, 0);
assert_eq!(stats.unique_chats, 0);
}
if let Ok(count) = store.cleanup_old_data(30).await {
assert_eq!(count, 0);
}
if let Ok(messages) = store.search_messages("test", None, 10).await {
assert!(messages.is_empty());
}
if let Ok(events) = store
.search_events_advanced(None, None, None, None, 10)
.await
{
assert!(events.is_empty());
}
if let Ok(events) = store.get_recent_events(None, None, 10).await {
assert!(events.is_empty());
}
let test_event = create_test_event();
if let Ok(id) = store.store_event(test_event).await {
assert_eq!(id, 1);
}
let test_message = create_test_message();
if let Ok(id) = store.store_message(test_message).await {
assert_eq!(id, 1);
}
}
#[test]
fn test_storage_error_types() {
let connection_error = StorageError::Connection("DB connection failed".to_string());
let query_error = StorageError::Query("Invalid SQL".to_string());
match connection_error {
StorageError::Connection(msg) => assert_eq!(msg, "DB connection failed"),
_ => panic!("Expected Connection error"),
}
match query_error {
StorageError::Query(msg) => assert_eq!(msg, "Invalid SQL"),
_ => panic!("Expected Query error"),
}
}
#[test]
fn test_event_type_strings() {
let message_type = "newMessage";
let edit_type = "editedMessage";
let delete_type = "deleteMessage";
assert_eq!(message_type, "newMessage");
assert_eq!(edit_type, "editedMessage");
assert_eq!(delete_type, "deleteMessage");
}
#[test]
fn test_message_handling() {
let message = create_test_message();
assert!(message.text.is_some());
assert!(!message.has_mentions);
assert!(message.file_attachments.is_some());
assert!(message.mentions.is_some());
}
#[test]
fn test_json_serialization() {
let json_data = serde_json::json!({
"test": "value",
"number": 42,
"array": [1, 2, 3],
"nested": {
"key": "nested_value"
}
});
assert_eq!(json_data["test"], "value");
assert_eq!(json_data["number"], 42);
assert_eq!(json_data["array"][0], 1);
assert_eq!(json_data["nested"]["key"], "nested_value");
let json_string = serde_json::to_string(&json_data).unwrap();
let parsed_back: serde_json::Value = serde_json::from_str(&json_string).unwrap();
assert_eq!(parsed_back, json_data);
}
#[test]
fn test_utc_timestamp() {
let now = Utc::now();
let earlier = now - chrono::Duration::seconds(3600);
assert!(now > earlier);
assert_eq!(now.timezone(), Utc);
let timestamp_str = now.to_rfc3339();
assert!(timestamp_str.contains('T'));
assert!(
timestamp_str.ends_with('Z')
|| timestamp_str.contains('+')
|| timestamp_str.contains('-')
);
}
}