fraiseql-server 2.2.0

HTTP server for FraiseQL v2 GraphQL engine
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
//! Observer management module for fraiseql-server.
//!
//! This module provides HTTP endpoints for managing observers (CRUD operations)
//! and integrates with the `fraiseql-observers` crate for event processing.
//!
//! # Architecture
//!
//! ```text
//! HTTP API (this module)
//!//! tb_observer (database)
//!//! ObserverRuntime (runtime.rs)
//!//! ChangeLogListener → ObserverExecutor
//!//! Actions (webhook, email, etc.)
//! ```
//!
//! # Features
//!
//! - CRUD operations for observer definitions
//! - Runtime execution of observers via change log polling
//! - Execution logging and statistics
//! - Multi-tenancy support via `fk_customer_org`
//! - Soft delete support

pub mod config;
pub mod handlers;
pub mod repository;
pub mod routes;
pub mod runtime;

use chrono::{DateTime, Utc};
pub use config::ObserverManagementConfig;
pub use handlers::{ObserverState, RuntimeHealthState};
pub use repository::ObserverRepository;
pub use routes::{observer_routes, observer_runtime_routes};
pub use runtime::{ObserverRuntime, ObserverRuntimeConfig, RuntimeHealth};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;

/// Observer definition from the database.
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct Observer {
    /// Internal primary key (Trinity pattern)
    pub pk_observer: i64,

    /// External UUID for API references
    pub id: Uuid,

    /// Human-readable name
    pub name: String,

    /// Description of what this observer does
    pub description: Option<String>,

    /// Entity type to observe (None = all types)
    pub entity_type: Option<String>,

    /// Event type to observe (None = all events)
    pub event_type: Option<String>,

    /// Condition expression (DSL filter)
    pub condition_expression: Option<String>,

    /// Actions to execute as JSON
    pub actions: serde_json::Value,

    /// Whether this observer is enabled
    pub enabled: bool,

    /// Priority for ordering (lower = higher priority)
    pub priority: i32,

    /// Retry configuration as JSON
    pub retry_config: serde_json::Value,

    /// Timeout for action execution (milliseconds)
    pub timeout_ms: i32,

    /// Customer organization ID (multi-tenancy)
    pub fk_customer_org: Option<i64>,

    /// When the observer was created
    pub created_at: DateTime<Utc>,

    /// When the observer was last updated
    pub updated_at: DateTime<Utc>,

    /// Who created the observer
    pub created_by: Option<String>,

    /// Who last updated the observer
    pub updated_by: Option<String>,

    /// Soft delete timestamp (None = not deleted)
    pub deleted_at: Option<DateTime<Utc>>,
}

/// Request to create a new observer.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateObserverRequest {
    /// Human-readable name (required)
    pub name: String,

    /// Description of what this observer does
    #[serde(default)]
    pub description: Option<String>,

    /// Entity type to observe (None = all types)
    #[serde(default)]
    pub entity_type: Option<String>,

    /// Event type to observe (None = all events)
    #[serde(default)]
    pub event_type: Option<String>,

    /// Condition expression (DSL filter)
    #[serde(default)]
    pub condition_expression: Option<String>,

    /// Actions to execute
    pub actions: Vec<ActionConfig>,

    /// Whether this observer is enabled (default: true)
    #[serde(default = "default_true")]
    pub enabled: bool,

    /// Priority for ordering (default: 100)
    #[serde(default = "default_priority")]
    pub priority: i32,

    /// Retry configuration
    #[serde(default)]
    pub retry_config: Option<RetryConfig>,

    /// Timeout for action execution in milliseconds (default: 30000)
    #[serde(default = "default_timeout")]
    pub timeout_ms: i32,
}

/// Request to update an existing observer.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct UpdateObserverRequest {
    /// Human-readable name
    #[serde(default)]
    pub name: Option<String>,

    /// Description
    #[serde(default)]
    pub description: Option<String>,

    /// Entity type to observe
    #[serde(default)]
    pub entity_type: Option<String>,

    /// Event type to observe
    #[serde(default)]
    pub event_type: Option<String>,

    /// Condition expression
    #[serde(default)]
    pub condition_expression: Option<String>,

    /// Actions to execute
    #[serde(default)]
    pub actions: Option<Vec<ActionConfig>>,

    /// Whether this observer is enabled
    #[serde(default)]
    pub enabled: Option<bool>,

    /// Priority for ordering
    #[serde(default)]
    pub priority: Option<i32>,

    /// Retry configuration
    #[serde(default)]
    pub retry_config: Option<RetryConfig>,

    /// Timeout in milliseconds
    #[serde(default)]
    pub timeout_ms: Option<i32>,
}

/// Action configuration for an observer.
///
/// Marked `#[non_exhaustive]` so that new action types can be added in future
/// minor versions without breaking downstream exhaustive `match` expressions.
#[non_exhaustive]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ActionConfig {
    /// HTTP webhook action
    Webhook {
        /// Target URL for the webhook POST request.
        url:           String,
        /// HTTP method (default: POST).
        #[serde(default = "default_method")]
        method:        String,
        /// Optional custom request headers.
        #[serde(default)]
        headers:       Option<std::collections::HashMap<String, String>>,
        /// Optional Handlebars body template.
        #[serde(default)]
        body_template: Option<String>,
    },

    /// Email notification action
    Email {
        /// Primary recipient address.
        to:               String,
        /// Optional CC recipient address.
        #[serde(default)]
        cc:               Option<String>,
        /// Handlebars template for the email subject.
        subject_template: String,
        /// Handlebars template for the email body.
        body_template:    String,
    },

    /// Slack message action
    Slack {
        /// Incoming webhook URL for the Slack workspace.
        webhook_url:      String,
        /// Optional target channel override.
        #[serde(default)]
        channel:          Option<String>,
        /// Handlebars template for the Slack message text.
        message_template: String,
    },

    /// Database function call
    Database {
        /// Name of the PostgreSQL function to invoke.
        function_name: String,
        /// Optional JSON parameters passed to the function.
        #[serde(default)]
        params:        Option<serde_json::Value>,
    },

    /// Log action (for debugging)
    Log {
        /// Log level: "trace", "debug", "info", "warn", or "error".
        #[serde(default = "default_log_level")]
        level:            String,
        /// Handlebars template for the log message.
        message_template: String,
    },
}

/// Retry configuration for observer actions.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
    /// Maximum number of retry attempts (default: 3)
    #[serde(default = "default_max_attempts")]
    pub max_attempts: i32,

    /// Backoff strategy: "fixed", "linear", "exponential"
    #[serde(default = "default_backoff")]
    pub backoff: String,

    /// Initial delay in milliseconds (default: 1000)
    #[serde(default = "default_initial_delay")]
    pub initial_delay_ms: i64,

    /// Maximum delay in milliseconds (default: 60000)
    #[serde(default = "default_max_delay")]
    pub max_delay_ms: i64,
}

impl Default for RetryConfig {
    fn default() -> Self {
        Self {
            max_attempts:     3,
            backoff:          "exponential".to_string(),
            initial_delay_ms: 1000,
            max_delay_ms:     60000,
        }
    }
}

/// Observer execution log entry.
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct ObserverLog {
    /// Internal primary key
    pub pk_observer_log: i64,

    /// External UUID
    pub id: Uuid,

    /// Reference to the observer
    pub fk_observer: i64,

    /// Event ID that triggered this execution
    pub event_id: Uuid,

    /// Entity type
    pub entity_type: String,

    /// Entity ID
    pub entity_id: Uuid,

    /// Event type (INSERT, UPDATE, DELETE)
    pub event_type: String,

    /// Execution status
    pub status: String,

    /// Action index in the actions array
    pub action_index: Option<i32>,

    /// Action type
    pub action_type: Option<String>,

    /// When execution started
    pub started_at: Option<DateTime<Utc>>,

    /// When execution completed
    pub completed_at: Option<DateTime<Utc>>,

    /// Duration in milliseconds
    pub duration_ms: Option<i32>,

    /// Error code (if failed)
    pub error_code: Option<String>,

    /// Error message (if failed)
    pub error_message: Option<String>,

    /// Retry attempt number
    pub attempt_number: i32,

    /// Trace ID for distributed tracing
    pub trace_id: Option<String>,

    /// When the log entry was created
    pub created_at: DateTime<Utc>,
}

/// Observer statistics from `vw_observer_stats` view.
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct ObserverStats {
    /// Internal primary key of the observer.
    pub pk_observer:           i64,
    /// External UUID of the observer.
    pub observer_id:           Uuid,
    /// Name of the observer.
    pub observer_name:         String,
    /// Entity type filter (if set on the observer).
    pub entity_type:           Option<String>,
    /// Event type filter (if set on the observer).
    pub event_type:            Option<String>,
    /// Whether the observer is currently enabled.
    pub enabled:               bool,
    /// Total number of executions recorded.
    pub total_executions:      i64,
    /// Number of executions that completed successfully.
    pub successful_executions: i64,
    /// Number of executions that ended in failure.
    pub failed_executions:     i64,
    /// Number of executions that timed out.
    pub timeout_executions:    i64,
    /// Number of executions that were skipped (condition not met).
    pub skipped_executions:    i64,
    /// Percentage of successful executions (0–100).
    pub success_rate_pct:      Option<f64>,
    /// Average execution duration in milliseconds.
    pub avg_duration_ms:       Option<f64>,
    /// Maximum execution duration in milliseconds.
    pub max_duration_ms:       Option<i32>,
    /// Minimum execution duration in milliseconds.
    pub min_duration_ms:       Option<i32>,
    /// Timestamp of the most recent execution.
    pub last_execution_at:     Option<DateTime<Utc>>,
}

/// Query parameters for listing observers.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListObserversQuery {
    /// Filter by entity type
    #[serde(default)]
    pub entity_type: Option<String>,

    /// Filter by event type
    #[serde(default)]
    pub event_type: Option<String>,

    /// Filter by enabled status
    #[serde(default)]
    pub enabled: Option<bool>,

    /// Include deleted observers
    #[serde(default)]
    pub include_deleted: bool,

    /// Page number (1-based)
    #[serde(default = "default_page")]
    pub page: i64,

    /// Page size
    #[serde(default = "default_page_size")]
    pub page_size: i64,
}

/// Query parameters for listing observer logs.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListObserverLogsQuery {
    /// Filter by observer ID
    #[serde(default)]
    pub observer_id: Option<Uuid>,

    /// Filter by status
    #[serde(default)]
    pub status: Option<String>,

    /// Filter by event ID
    #[serde(default)]
    pub event_id: Option<Uuid>,

    /// Filter by trace ID
    #[serde(default)]
    pub trace_id: Option<String>,

    /// Page number (1-based)
    #[serde(default = "default_page")]
    pub page: i64,

    /// Page size
    #[serde(default = "default_page_size")]
    pub page_size: i64,
}

/// Paginated response wrapper.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PaginatedResponse<T> {
    /// Items on the current page.
    pub data:        Vec<T>,
    /// Current page number (1-based).
    pub page:        i64,
    /// Number of items per page.
    pub page_size:   i64,
    /// Total number of items across all pages.
    pub total_count: i64,
    /// Total number of pages.
    pub total_pages: i64,
}

impl<T> PaginatedResponse<T> {
    /// Construct a paginated response from a page of data and the total item count.
    #[must_use]
    pub const fn new(data: Vec<T>, page: i64, page_size: i64, total_count: i64) -> Self {
        let total_pages = (total_count + page_size - 1) / page_size;
        Self {
            data,
            page,
            page_size,
            total_count,
            total_pages,
        }
    }
}

// Default value functions for serde
const fn default_true() -> bool {
    true
}
const fn default_priority() -> i32 {
    100
}
const fn default_timeout() -> i32 {
    30000
}
fn default_method() -> String {
    "POST".to_string()
}
fn default_log_level() -> String {
    "info".to_string()
}
const fn default_max_attempts() -> i32 {
    3
}
fn default_backoff() -> String {
    "exponential".to_string()
}
const fn default_initial_delay() -> i64 {
    1000
}
const fn default_max_delay() -> i64 {
    60000
}
const fn default_page() -> i64 {
    1
}
const fn default_page_size() -> i64 {
    20
}