Skip to main content

teaql_runtime/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::{BTreeMap, HashMap};
3use std::future::Future;
4
5use std::pin::Pin;
6use std::sync::Mutex;
7use std::time::{Duration, SystemTime};
8
9use teaql_core::{EntityDescriptor, Record, UpdateCommand, Value};
10use teaql_sql::{CompiledQuery, DatabaseKind};
11
12use crate::{
13    CheckResults, CheckerRegistry, ContextError, RawAuditEvent, RawAuditEventSink, GraphNode,
14    InternalIdGenerator, Language, MetadataStore, ObjectLocation, RepositoryBehavior,
15    RepositoryBehaviorRegistry, RepositoryRegistry, RequestPolicy, RuntimeError,
16    local_id_generator, translate_check_result,
17};
18use crate::{EntityRoot, RepositoryError};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum SqlLogOperation {
22    Select,
23    Insert,
24    Update,
25    Delete,
26    Recover,
27}
28
29impl SqlLogOperation {
30    pub fn is_select(self) -> bool {
31        matches!(self, Self::Select)
32    }
33
34    pub fn is_mutation(self) -> bool {
35        !self.is_select()
36    }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
40pub struct SqlLogOptions {
41    pub select: bool,
42    pub mutation: bool,
43}
44
45impl SqlLogOptions {
46    pub fn disabled() -> Self {
47        Self {
48            select: false,
49            mutation: false,
50        }
51    }
52
53    pub fn select_only() -> Self {
54        Self {
55            select: true,
56            mutation: false,
57        }
58    }
59
60    pub fn mutation_only() -> Self {
61        Self {
62            select: false,
63            mutation: true,
64        }
65    }
66
67    pub fn all() -> Self {
68        Self {
69            select: true,
70            mutation: true,
71        }
72    }
73
74    pub fn enabled_for(self, operation: SqlLogOperation) -> bool {
75        if operation.is_select() {
76            self.select
77        } else {
78            self.mutation
79        }
80    }
81}
82
83#[derive(Debug, Clone, PartialEq)]
84pub struct SqlLogEntry {
85    pub operation: SqlLogOperation,
86    pub sql: String,
87    pub params: Vec<Value>,
88    pub debug_sql: String,
89    pub pretty_sql: String,
90    pub started_at: SystemTime,
91    pub ended_at: SystemTime,
92    pub elapsed: Duration,
93    pub result_count: Option<usize>,
94    pub result_type: Option<String>,
95    pub affected_rows: Option<u64>,
96    pub result_summary: String,
97}
98
99#[derive(Debug, Clone, PartialEq)]
100pub struct UnifiedLogEntry {
101    pub timestamp: SystemTime,
102    pub user_identifier: Option<String>,
103    pub trace_chain: Vec<teaql_core::TraceNode>,
104    pub payload: LogPayload,
105}
106
107#[derive(Debug, Clone, PartialEq)]
108pub enum LogPayload {
109    Sql(SqlLogEntry),
110    Info(InfoLogEntry),
111}
112
113#[derive(Debug, Clone, PartialEq)]
114pub struct InfoLogEntry {
115    pub message: String,
116}
117
118#[derive(Clone, Default)]
119pub struct UnifiedLogBuffer {
120    pub entries: std::sync::Arc<Mutex<Vec<UnifiedLogEntry>>>,
121}
122
123pub trait SchemaProvider: Send + Sync {
124    fn ensure_schema<'a>(
125        &'a self,
126        ctx: &'a UserContext,
127    ) -> Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>>;
128}
129
130pub struct UserContext {
131    pub(crate) metadata: Option<Box<dyn MetadataStore>>,
132    pub(crate) repository_registry: Option<Box<dyn RepositoryRegistry>>,
133    pub(crate) repository_behavior_registry: Option<Box<dyn RepositoryBehaviorRegistry>>,
134    pub(crate) request_policy: Option<Box<dyn RequestPolicy>>,
135    pub(crate) checker_registry: Option<Box<dyn CheckerRegistry>>,
136    pub(crate) event_sink: Option<Box<dyn RawAuditEventSink>>,
137    pub(crate) custom_event_sink: Option<Box<dyn crate::SafeAuditEventSink>>,
138    pub(crate) internal_id_generator: Option<Box<dyn InternalIdGenerator>>,
139    schema_provider: Option<Box<dyn SchemaProvider>>,
140    language: Language,
141    typed_resources: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
142    named_resources: BTreeMap<String, Box<dyn Any + Send + Sync>>,
143    locals: BTreeMap<String, Value>,
144    pub(crate) initial_graphs: Vec<GraphNode>,
145    entity_root: EntityRoot,
146    sql_log_options: SqlLogOptions,
147    sql_log_entries: Mutex<Vec<SqlLogEntry>>,
148    user_identifier: Option<String>,
149    timezone: Option<String>,
150    trace_id: String,
151}
152
153impl Default for UserContext {
154    fn default() -> Self {
155        let pid = std::process::id();
156        let thread_id_str = format!("{:?}", std::thread::current().id());
157        let numeric_thread_id = thread_id_str
158            .strip_prefix("ThreadId(")
159            .and_then(|s| s.strip_suffix(")"))
160            .unwrap_or(&thread_id_str);
161        let os_user = std::env::var("USER")
162            .or_else(|_| std::env::var("USERNAME"))
163            .unwrap_or_else(|_| "main".to_owned());
164        let user_id = format!("{os_user}@pid-{pid}.tid-{numeric_thread_id}");
165        Self {
166            metadata: None,
167            repository_registry: None,
168            repository_behavior_registry: None,
169            request_policy: None,
170            checker_registry: None,
171            event_sink: None,
172            custom_event_sink: None,
173            internal_id_generator: None,
174            schema_provider: None,
175            language: Language::default(),
176            typed_resources: HashMap::new(),
177            named_resources: BTreeMap::new(),
178            locals: BTreeMap::new(),
179            initial_graphs: Vec::new(),
180            entity_root: EntityRoot::default(),
181            sql_log_options: SqlLogOptions::all(),
182            sql_log_entries: Mutex::new(Vec::new()),
183            user_identifier: Some(user_id),
184            timezone: Some("UTC".to_owned()),
185            trace_id: format!("req-{pid}-{numeric_thread_id}-{:x}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_micros()),
186        }
187    }
188}
189
190#[async_trait::async_trait]
191pub trait DataStore: Send + Sync + 'static {
192    async fn get(&self, key: &str) -> Option<Value>;
193    async fn put(&self, key: &str, value: Value, timeout_seconds: Option<u64>);
194    async fn remove(&self, key: &str);
195}
196
197#[derive(Default)]
198pub struct InMemoryDataStore {
199    cache: std::sync::RwLock<HashMap<String, (Value, Option<std::time::Instant>)>>,
200}
201
202#[async_trait::async_trait]
203impl DataStore for InMemoryDataStore {
204    async fn get(&self, key: &str) -> Option<Value> {
205        let lock = self.cache.read().unwrap();
206        if let Some((val, expires_at)) = lock.get(key) {
207            if let Some(exp) = expires_at {
208                if std::time::Instant::now() > *exp {
209                    return None;
210                }
211            }
212            return Some(val.clone());
213        }
214        None
215    }
216
217    async fn put(&self, key: &str, value: Value, timeout_seconds: Option<u64>) {
218        let mut lock = self.cache.write().unwrap();
219        let expires_at = timeout_seconds.map(|secs| std::time::Instant::now() + std::time::Duration::from_secs(secs));
220        lock.insert(key.to_string(), (value, expires_at));
221    }
222
223    async fn remove(&self, key: &str) {
224        let mut lock = self.cache.write().unwrap();
225        lock.remove(key);
226    }
227}
228
229impl UserContext {
230    pub fn new() -> Self {
231        Self::default()
232    }
233
234    pub fn user_identifier(&self) -> Option<&str> {
235        self.user_identifier.as_deref()
236    }
237
238    pub fn set_user_identifier(&mut self, user_identifier: impl Into<String>) {
239        self.user_identifier = Some(user_identifier.into());
240    }
241
242    pub fn with_user_identifier(mut self, user_identifier: impl Into<String>) -> Self {
243        self.user_identifier = Some(user_identifier.into());
244        self
245    }
246
247    pub fn set_user_identifier_option(&mut self, user_identifier: Option<String>) {
248        self.user_identifier = user_identifier;
249    }
250
251    pub fn with_user_identifier_option(mut self, user_identifier: Option<String>) -> Self {
252        self.user_identifier = user_identifier;
253        self
254    }
255
256    pub fn timezone(&self) -> Option<&str> {
257        self.timezone.as_deref()
258    }
259
260    pub fn set_timezone(&mut self, timezone: impl Into<String>) {
261        self.timezone = Some(timezone.into());
262    }
263
264    pub fn with_timezone(mut self, timezone: impl Into<String>) -> Self {
265        self.timezone = Some(timezone.into());
266        self
267    }
268
269    pub fn trace_id(&self) -> &str {
270        &self.trace_id
271    }
272
273    pub fn set_trace_id(&mut self, trace_id: impl Into<String>) {
274        self.trace_id = trace_id.into();
275    }
276
277    pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
278        self.trace_id = trace_id.into();
279        self
280    }
281
282    pub fn with_module(mut self, module: crate::RuntimeModule) -> Self {
283        module.apply_to(&mut self);
284        self
285    }
286
287    pub fn entity_root(&self) -> EntityRoot {
288        self.entity_root.clone()
289    }
290
291    pub fn initial_graphs(&self) -> &[GraphNode] {
292        &self.initial_graphs
293    }
294
295    pub fn set_initial_graphs(&mut self, graphs: Vec<GraphNode>) {
296        self.initial_graphs = graphs;
297    }
298
299    pub fn with_metadata(mut self, metadata: impl MetadataStore + 'static) -> Self {
300        self.metadata = Some(Box::new(metadata));
301        self
302    }
303
304    pub fn set_metadata(&mut self, metadata: impl MetadataStore + 'static) {
305        self.metadata = Some(Box::new(metadata));
306    }
307
308    pub fn with_repository_registry(mut self, registry: impl RepositoryRegistry + 'static) -> Self {
309        self.repository_registry = Some(Box::new(registry));
310        self
311    }
312
313    pub fn set_repository_registry(&mut self, registry: impl RepositoryRegistry + 'static) {
314        self.repository_registry = Some(Box::new(registry));
315    }
316
317    pub fn with_repository_behavior_registry(
318        mut self,
319        registry: impl RepositoryBehaviorRegistry + 'static,
320    ) -> Self {
321        self.repository_behavior_registry = Some(Box::new(registry));
322        self
323    }
324
325    pub fn set_repository_behavior_registry(
326        &mut self,
327        registry: impl RepositoryBehaviorRegistry + 'static,
328    ) {
329        self.repository_behavior_registry = Some(Box::new(registry));
330    }
331
332    pub fn with_request_policy(mut self, policy: impl RequestPolicy + 'static) -> Self {
333        self.request_policy = Some(Box::new(policy));
334        self
335    }
336
337    pub fn set_request_policy(&mut self, policy: impl RequestPolicy + 'static) {
338        self.request_policy = Some(Box::new(policy));
339    }
340
341    pub fn clear_request_policy(&mut self) {
342        self.request_policy = None;
343    }
344
345    pub fn with_checker_registry(mut self, registry: impl CheckerRegistry + 'static) -> Self {
346        self.checker_registry = Some(Box::new(registry));
347        self
348    }
349
350    pub fn set_checker_registry(&mut self, registry: impl CheckerRegistry + 'static) {
351        self.checker_registry = Some(Box::new(registry));
352    }
353
354    pub(crate) fn with_event_sink(mut self, sink: impl RawAuditEventSink + 'static) -> Self {
355        self.event_sink = Some(Box::new(sink));
356        self
357    }
358
359    pub(crate) fn set_event_sink(&mut self, sink: impl RawAuditEventSink + 'static) {
360        self.event_sink = Some(Box::new(sink));
361    }
362
363    pub fn with_custom_event_sink(mut self, sink: impl crate::SafeAuditEventSink + 'static) -> Self {
364        self.custom_event_sink = Some(Box::new(sink));
365        self
366    }
367
368    pub fn set_custom_event_sink(&mut self, sink: impl crate::SafeAuditEventSink + 'static) {
369        self.custom_event_sink = Some(Box::new(sink));
370    }
371
372    pub fn with_internal_id_generator(
373        mut self,
374        generator: impl InternalIdGenerator + 'static,
375    ) -> Self {
376        self.internal_id_generator = Some(Box::new(generator));
377        self
378    }
379
380    pub fn set_internal_id_generator(&mut self, generator: impl InternalIdGenerator + 'static) {
381        self.internal_id_generator = Some(Box::new(generator));
382    }
383
384    pub fn with_schema_provider(mut self, provider: impl SchemaProvider + 'static) -> Self {
385        self.schema_provider = Some(Box::new(provider));
386        self
387    }
388
389    pub fn set_schema_provider(&mut self, provider: impl SchemaProvider + 'static) {
390        self.schema_provider = Some(Box::new(provider));
391    }
392
393    pub async fn ensure_schema(&self) -> Result<(), RuntimeError> {
394        let provider = self
395            .schema_provider
396            .as_ref()
397            .ok_or_else(|| RuntimeError::Schema("missing schema provider".to_owned()))?;
398        provider.ensure_schema(self).await
399    }
400
401    pub fn with_language(mut self, language: Language) -> Self {
402        self.language = language;
403        self
404    }
405
406    pub fn set_language(&mut self, language: Language) {
407        self.language = language;
408    }
409
410    pub fn with_sql_log_options(mut self, options: SqlLogOptions) -> Self {
411        self.sql_log_options = options;
412        self
413    }
414
415    pub fn set_sql_log_options(&mut self, options: SqlLogOptions) {
416        self.sql_log_options = options;
417    }
418
419    pub fn enable_select_sql_log(&mut self) {
420        self.sql_log_options.select = true;
421    }
422
423    pub fn enable_mutation_sql_log(&mut self) {
424        self.sql_log_options.mutation = true;
425    }
426
427    pub fn enable_all_sql_log(&mut self) {
428        self.sql_log_options = SqlLogOptions::all();
429    }
430
431    pub fn disable_sql_log(&mut self) {
432        self.sql_log_options = SqlLogOptions::disabled();
433        self.clear_sql_logs();
434    }
435
436    pub fn sql_log_options(&self) -> SqlLogOptions {
437        self.sql_log_options
438    }
439
440    pub fn sql_logs(&self) -> Vec<SqlLogEntry> {
441        self.sql_log_entries
442            .lock()
443            .map(|entries| entries.clone())
444            .unwrap_or_default()
445    }
446
447    pub fn clear_sql_logs(&self) {
448        if let Ok(mut entries) = self.sql_log_entries.lock() {
449            entries.clear();
450        }
451    }
452
453    pub(crate) fn record_sql_log(
454        &self,
455        operation: SqlLogOperation,
456        query: &CompiledQuery,
457        database_kind: DatabaseKind,
458        started_at: SystemTime,
459        ended_at: SystemTime,
460        elapsed: Duration,
461        result_count: Option<usize>,
462        result_type: Option<String>,
463        affected_rows: Option<u64>,
464        trace_chain: Vec<teaql_core::TraceNode>,
465    ) {
466        if !self.sql_log_options.enabled_for(operation) {
467            return;
468        }
469        let debug_sql = query.debug_sql(database_kind);
470        let result_summary = sql_result_summary(
471            operation,
472            result_count,
473            result_type.as_deref(),
474            affected_rows,
475            &debug_sql,
476        );
477
478        let sql_log_entry = SqlLogEntry {
479            operation,
480            sql: query.sql.clone(),
481            params: query.params.clone(),
482            pretty_sql: pretty_sql(&debug_sql),
483            debug_sql: debug_sql.clone(),
484            started_at,
485            ended_at,
486            elapsed,
487            result_summary: result_summary.clone(),
488            result_count,
489            result_type,
490            affected_rows,
491        };
492
493        if let Ok(mut entries) = self.sql_log_entries.lock() {
494            // Keep sql_log_entries backwards-compatible for now if needed,
495            // wait, we modified SqlLogEntry. We can just push it directly since we removed comment.
496            // Wait, we need to push a cloned SqlLogEntry since it doesn't have comment.
497            entries.push(sql_log_entry.clone());
498        }
499
500        if let Some(buf) = self.get_resource::<UnifiedLogBuffer>() {
501            if let Ok(mut entries) = buf.entries.lock() {
502                entries.push(UnifiedLogEntry {
503                    timestamp: started_at,
504                    user_identifier: self.user_identifier.clone(),
505                    trace_chain: trace_chain.clone(),
506                    payload: LogPayload::Sql(sql_log_entry.clone()),
507                });
508            }
509        }
510        
511        crate::log_formatter::LogManager::write_sql_log(&trace_chain, &sql_log_entry);
512    }
513
514    pub(crate) fn record_metadata_log(&self, metadata: &teaql_data_service::ExecutionMetadata) {
515        if let Some(debug_sql) = &metadata.debug_query {
516            let sql_log_entry = SqlLogEntry {
517                operation: match metadata.operation {
518                    teaql_data_service::DataServiceOperation::Query => SqlLogOperation::Select,
519                    teaql_data_service::DataServiceOperation::Insert => SqlLogOperation::Insert,
520                    teaql_data_service::DataServiceOperation::Update => SqlLogOperation::Update,
521                    teaql_data_service::DataServiceOperation::Delete => SqlLogOperation::Delete,
522                    teaql_data_service::DataServiceOperation::Recover => SqlLogOperation::Update, // Approximate
523                    teaql_data_service::DataServiceOperation::Batch => SqlLogOperation::Update,
524                    teaql_data_service::DataServiceOperation::Schema => SqlLogOperation::Update,
525                },
526                sql: String::new(), // Not available in metadata
527                params: Vec::new(), // Not available in metadata
528                pretty_sql: pretty_sql(debug_sql),
529                debug_sql: debug_sql.clone(),
530                started_at: metadata.started_at,
531                ended_at: metadata.ended_at,
532                elapsed: metadata.ended_at.duration_since(metadata.started_at).unwrap_or_default(),
533                result_count: metadata.result_count,
534                result_type: None, // Not directly available
535                affected_rows: metadata.affected_rows,
536                result_summary: String::new(), // We can synthesize this if needed, or leave it empty/basic
537            };
538
539            // synthesize a summary for the log
540            let mut summary = String::new();
541            if let Some(c) = metadata.result_count {
542                summary = format!("{} rows returned", c);
543            } else if let Some(a) = metadata.affected_rows {
544                summary = format!("{} rows affected", a);
545            }
546
547            let mut final_entry = sql_log_entry;
548            final_entry.result_summary = summary;
549
550            if let Ok(mut entries) = self.sql_log_entries.lock() {
551                entries.push(final_entry.clone());
552            }
553
554            if let Some(buf) = self.get_resource::<UnifiedLogBuffer>() {
555                if let Ok(mut entries) = buf.entries.lock() {
556                    entries.push(UnifiedLogEntry {
557                        timestamp: metadata.started_at,
558                        user_identifier: self.user_identifier.clone(),
559                        trace_chain: metadata.trace_chain.clone(),
560                        payload: LogPayload::Sql(final_entry.clone()),
561                    });
562                }
563            }
564            
565            crate::log_formatter::LogManager::write_sql_log(&metadata.trace_chain, &final_entry);
566        }
567    }
568
569    pub fn language(&self) -> Language {
570        self.language
571    }
572
573    pub fn set_language_code(&mut self, code: &str) -> Result<(), RuntimeError> {
574        let Some(language) = Language::from_code(code) else {
575            return Err(RuntimeError::Language(format!(
576                "unsupported language code: {code}"
577            )));
578        };
579        self.language = language;
580        Ok(())
581    }
582
583    pub fn generate_id(&self, entity: &str) -> Result<Option<u64>, RuntimeError> {
584        self.internal_id_generator
585            .as_ref()
586            .map(|generator| generator.generate_id(entity))
587            .transpose()
588    }
589
590    pub fn next_id(&self, entity: &str) -> Result<u64, RuntimeError> {
591        match self.generate_id(entity)? {
592            Some(id) => Ok(id),
593            None => local_id_generator().generate_id(entity),
594        }
595    }
596
597    pub fn entity(&self, name: &str) -> Option<&EntityDescriptor> {
598        self.metadata
599            .as_ref()
600            .and_then(|metadata| metadata.entity(name))
601    }
602
603    pub fn all_entities(&self) -> Vec<&EntityDescriptor> {
604        self.metadata
605            .as_ref()
606            .map(|metadata| metadata.all_entities())
607            .unwrap_or_default()
608    }
609
610    pub fn require_entity(&self, name: &str) -> Result<&EntityDescriptor, RuntimeError> {
611        self.entity(name)
612            .ok_or_else(|| RuntimeError::MissingEntity(name.to_owned()))
613    }
614
615    pub fn insert_resource<T>(&mut self, resource: T)
616    where
617        T: Send + Sync + 'static,
618    {
619        self.typed_resources
620            .insert(TypeId::of::<T>(), Box::new(resource));
621    }
622
623    pub fn get_resource<T>(&self) -> Option<&T>
624    where
625        T: Send + Sync + 'static,
626    {
627        self.typed_resources
628            .get(&TypeId::of::<T>())
629            .and_then(|value| value.downcast_ref::<T>())
630    }
631
632    pub fn require_resource<T>(&self) -> Result<&T, ContextError>
633    where
634        T: Send + Sync + 'static,
635    {
636        self.get_resource::<T>()
637            .ok_or(ContextError::MissingTypedResource(
638                std::any::type_name::<T>(),
639            ))
640    }
641
642    pub fn insert_named_resource<T>(&mut self, name: impl Into<String>, resource: T)
643    where
644        T: Send + Sync + 'static,
645    {
646        self.named_resources.insert(name.into(), Box::new(resource));
647    }
648
649    pub fn get_named_resource<T>(&self, name: &str) -> Option<&T>
650    where
651        T: Send + Sync + 'static,
652    {
653        self.named_resources
654            .get(name)
655            .and_then(|value| value.downcast_ref::<T>())
656    }
657
658    pub fn require_named_resource<T>(&self, name: &str) -> Result<&T, ContextError>
659    where
660        T: Send + Sync + 'static,
661    {
662        self.get_named_resource::<T>(name)
663            .ok_or_else(|| ContextError::MissingResource(name.to_owned()))
664    }
665
666    pub fn put_local(&mut self, key: impl Into<String>, value: impl Into<Value>) {
667        self.locals.insert(key.into(), value.into());
668    }
669
670    pub fn local(&self, key: &str) -> Option<&Value> {
671        self.locals.get(key)
672    }
673
674    pub fn remove_local(&mut self, key: &str) -> Option<Value> {
675        self.locals.remove(key)
676    }
677
678    pub fn has_repository(&self, entity: &str) -> bool {
679        let in_registry = self
680            .repository_registry
681            .as_ref()
682            .map(|registry| registry.contains(entity))
683            .unwrap_or(false);
684        in_registry || self.entity(entity).is_some()
685    }
686
687    pub fn repository_behavior(
688        &self,
689        entity: &str,
690    ) -> Option<std::sync::Arc<dyn RepositoryBehavior>> {
691        self.repository_behavior_registry
692            .as_ref()
693            .and_then(|registry| registry.behavior(entity))
694    }
695
696    pub fn has_checker(&self, entity: &str) -> bool {
697        self.checker_registry
698            .as_ref()
699            .and_then(|registry| registry.checker(entity))
700            .is_some()
701    }
702
703    pub fn check_and_fix_record(
704        &self,
705        entity: &str,
706        record: &mut Record,
707    ) -> Result<(), RuntimeError> {
708        self.check_and_fix_record_at(entity, record, &ObjectLocation::root())
709    }
710
711    pub fn check_and_fix_record_at(
712        &self,
713        entity: &str,
714        record: &mut Record,
715        location: &ObjectLocation,
716    ) -> Result<(), RuntimeError> {
717        let Some(checker) = self
718            .checker_registry
719            .as_ref()
720            .and_then(|registry| registry.checker(entity))
721        else {
722            return Ok(());
723        };
724        let mut results = CheckResults::new();
725        checker.check_and_fix(self, record, location, &mut results);
726        if results.is_empty() {
727            Ok(())
728        } else {
729            self.translate_check_results(&mut results);
730            Err(RuntimeError::Check(results))
731        }
732    }
733
734    pub fn translate_check_results(&self, results: &mut CheckResults) {
735        for result in results {
736            result.message = Some(translate_check_result(self.language, result));
737        }
738    }
739
740    pub fn send_event(&self, event: RawAuditEvent) -> Result<(), RuntimeError> {
741        if let Some(sink) = self.event_sink.as_ref() {
742            sink.on_event(self, &event)?;
743        }
744        if let Some(sink) = self.custom_event_sink.as_ref() {
745            let (mask_fields, max_len) = if let Some(metadata) = &self.metadata {
746                if let Some(desc) = metadata.entity(&event.entity) {
747                    (desc.audit_mask_fields.clone(), desc.audit_value_max_len)
748                } else {
749                    (vec![], None)
750                }
751            } else {
752                (vec![], None)
753            };
754            
755            let safe_event = event.build_safe_event(&mask_fields, max_len);
756            sink.on_safe_event(self, &safe_event)?;
757        }
758        
759        crate::log_formatter::LogManager::write_audit_log(&event);
760        
761        Ok(())
762    }
763
764    pub async fn commit_changes<E>(&self) -> Result<(), RepositoryError<E::Error>>
765    where
766        E: teaql_data_service::MutationExecutor + Send + Sync + 'static,
767    {
768        let executor = self.require_resource::<E>().map_err(|err| {
769            RepositoryError::Runtime(RuntimeError::Graph(format!(
770                "cannot commit changes without executor: {err}"
771            )))
772        })?;
773        let change_set = self.entity_root.current_change_set();
774
775        for (key, changes) in change_set.changes() {
776            if changes.is_empty() {
777                continue;
778            }
779            let _entity = self
780                .require_entity(&key.entity)
781                .map_err(RepositoryError::Runtime)?;
782            let mut command = UpdateCommand::new(&key.entity, key.id.clone());
783            for (field, value) in changes {
784                command = command.value(field.clone(), value.clone());
785            }
786            let request = teaql_data_service::MutationRequest::Update(command);
787            executor
788                .mutate(request).await
789                .map_err(RepositoryError::Executor)?;
790        }
791
792        self.entity_root.clear_current_change_set();
793        Ok(())
794    }
795
796    pub async fn get_in_store(&self, key: &str) -> Option<Value> {
797        if let Some(store) = self.get_resource::<Box<dyn DataStore>>() {
798            store.get(key).await
799        } else {
800            None
801        }
802    }
803
804    pub async fn put_in_store(&self, key: &str, value: impl Into<Value>, timeout_seconds: Option<u64>) {
805        if let Some(store) = self.get_resource::<Box<dyn DataStore>>() {
806            store.put(key, value.into(), timeout_seconds).await;
807        }
808    }
809
810    pub async fn clear_in_store(&self, key: &str) {
811        if let Some(store) = self.get_resource::<Box<dyn DataStore>>() {
812            store.remove(key).await;
813        }
814    }
815}
816
817fn extract_id_from_sql(sql: &str) -> Option<String> {
818    let sql_lower = sql.to_lowercase();
819    let where_idx = sql_lower.find("where")?;
820    let where_clause = &sql_lower[where_idx + 5..];
821    
822    let bytes = where_clause.as_bytes();
823    let mut i = 0;
824    while i < bytes.len() {
825        if i + 1 < bytes.len() && &bytes[i..i+2] == b"id" {
826            // Check boundary before
827            let prev_ok = if i == 0 {
828                true
829            } else {
830                let prev_char = bytes[i - 1] as char;
831                !prev_char.is_ascii_alphanumeric() && prev_char != '_' && prev_char != '.'
832            };
833            // Check boundary after
834            let next_ok = if i + 2 == bytes.len() {
835                true
836            } else {
837                let next_char = bytes[i + 2] as char;
838                !next_char.is_ascii_alphanumeric() && next_char != '_'
839            };
840            
841            if prev_ok && next_ok {
842                // Found the standalone "id" word!
843                // Now look for "=" after it
844                let mut j = i + 2;
845                while j < bytes.len() && (bytes[j] as char).is_whitespace() {
846                    j += 1;
847                }
848                if j < bytes.len() && bytes[j] == b'=' {
849                    j += 1;
850                    while j < bytes.len() && (bytes[j] as char).is_whitespace() {
851                        j += 1;
852                      }
853                      // Now extract the value
854                      let mut val_str = String::new();
855                      if j < bytes.len() && bytes[j] == b'\'' {
856                          j += 1; // consume single quote
857                          while j < bytes.len() && bytes[j] != b'\'' {
858                              val_str.push(bytes[j] as char);
859                              j += 1;
860                          }
861                          return Some(val_str);
862                      } else {
863                          while j < bytes.len() {
864                              let c = bytes[j] as char;
865                              if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
866                                  val_str.push(c);
867                                  j += 1;
868                              } else {
869                                  break;
870                              }
871                          }
872                          if !val_str.is_empty() {
873                              return Some(val_str);
874                          }
875                      }
876                }
877            }
878        }
879        i += 1;
880    }
881    None
882}
883
884fn sql_result_summary(
885    operation: SqlLogOperation,
886    result_count: Option<usize>,
887    result_type: Option<&str>,
888    affected_rows: Option<u64>,
889    debug_sql: &str,
890) -> String {
891    match operation {
892        SqlLogOperation::Select => {
893            let count = result_count.unwrap_or(0);
894            if count == 0 {
895                "MISS".to_owned()
896            } else if count > 1 {
897                match result_type {
898                    Some(result_type) => format!("{count}*{result_type}"),
899                    None => format!("{count}*rows"),
900                }
901            } else {
902                match result_type {
903                    Some(result_type) => {
904                        if let Some(id) = extract_id_from_sql(debug_sql) {
905                            format!("{result_type}({id})")
906                        } else {
907                            result_type.to_owned()
908                        }
909                    }
910                    None => "row".to_owned(),
911                }
912            }
913        }
914        _ => {
915            let affected = affected_rows.unwrap_or(0);
916            format!("{affected} UPDATED")
917        }
918    }
919}
920
921fn pretty_sql(sql: &str) -> String {
922    let mut pretty = sql.to_owned();
923    for keyword in [
924        " FROM ",
925        " WHERE ",
926        " GROUP BY ",
927        " HAVING ",
928        " ORDER BY ",
929        " LIMIT ",
930        " OFFSET ",
931        " RETURNING ",
932    ] {
933        pretty = pretty.replace(keyword, &format!("\n{}", keyword.trim_start()));
934    }
935    pretty
936        .replace(" AND ", "\n  AND ")
937}