Skip to main content

robson_core/entities/
process_event.rs

1use anyhow::Result;
2use chrono::Utc;
3use sea_orm::entity::prelude::*;
4use sea_orm::{ActiveValue::Set, QueryOrder};
5use serde::{Deserialize, Serialize};
6
7#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
8#[sea_orm(table_name = "process_events")]
9pub struct Model {
10    #[sea_orm(primary_key)]
11    pub id: i32,
12    pub conversation_id: i32,
13    pub kind: String,
14    pub content: String,
15    pub created_at: String,
16    /// Unix epoch timestamp set when all registered gateways have delivered this event.
17    /// NULL means delivery is still pending for at least one gateway.
18    pub delivered_at: Option<i64>,
19}
20
21#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
22pub enum Relation {}
23
24impl ActiveModelBehavior for ActiveModel {}
25
26impl Model {
27    pub async fn insert(
28        db: &DatabaseConnection,
29        conversation_id: i32,
30        kind: &str,
31        content: &str,
32    ) -> Result<()> {
33        let now = Utc::now().to_rfc3339();
34        let active = ActiveModel {
35            conversation_id: Set(conversation_id),
36            kind: Set(kind.to_string()),
37            content: Set(content.to_string()),
38            created_at: Set(now),
39            delivered_at: Set(None),
40            ..Default::default()
41        };
42        active.insert(db).await?;
43        Ok(())
44    }
45
46    /// Returns all process_events where delivered_at IS NULL (not yet fully delivered).
47    pub async fn find_undelivered(db: &DatabaseConnection) -> Result<Vec<Model>> {
48        let rows = Entity::find()
49            .filter(Column::DeliveredAt.is_null())
50            .order_by_asc(Column::Id)
51            .all(db)
52            .await?;
53        Ok(rows)
54    }
55
56    /// Marks a process_event as fully delivered (all gateways done) using current Unix epoch.
57    pub async fn mark_delivered(db: &DatabaseConnection, id: i32) -> Result<()> {
58        let now = Utc::now().timestamp();
59        let active = ActiveModel {
60            id: Set(id),
61            delivered_at: Set(Some(now)),
62            ..Default::default()
63        };
64        active.update(db).await?;
65        Ok(())
66    }
67
68    pub async fn find_by_conversation(
69        db: &DatabaseConnection,
70        conversation_id: i32,
71    ) -> Result<Vec<Model>> {
72        let rows = Entity::find()
73            .filter(Column::ConversationId.eq(conversation_id))
74            .order_by_asc(Column::CreatedAt)
75            .all(db)
76            .await?;
77        Ok(rows)
78    }
79}