Skip to main content

teaql_runtime/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::{BTreeMap, HashMap};
3use std::future::Future;
4
5use std::pin::Pin;
6use std::sync::Mutex;
7use std::time::{Duration, SystemTime};
8
9use teaql_core::{EntityDescriptor, Record, UpdateCommand, Value};
10use teaql_sql::{CompiledQuery, DatabaseKind};
11
12use crate::{
13    CheckResults, CheckerRegistry, ContextError, RawAuditEvent, RawAuditEventSink, GraphNode,
14    InternalIdGenerator, Language, MetadataStore, ObjectLocation, RepositoryBehavior,
15    RepositoryBehaviorRegistry, RepositoryRegistry, RequestPolicy, RuntimeError,
16    local_id_generator, translate_check_result,
17};
18use crate::{EntityRoot, RepositoryError};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum SqlLogOperation {
22    Select,
23    Insert,
24    Update,
25    Delete,
26    Recover,
27}
28
29impl SqlLogOperation {
30    pub fn is_select(self) -> bool {
31        matches!(self, Self::Select)
32    }
33
34    pub fn is_mutation(self) -> bool {
35        !self.is_select()
36    }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
40pub struct SqlLogOptions {
41    pub select: bool,
42    pub mutation: bool,
43}
44
45impl SqlLogOptions {
46    pub fn disabled() -> Self {
47        Self {
48            select: false,
49            mutation: false,
50        }
51    }
52
53    pub fn select_only() -> Self {
54        Self {
55            select: true,
56            mutation: false,
57        }
58    }
59
60    pub fn mutation_only() -> Self {
61        Self {
62            select: false,
63            mutation: true,
64        }
65    }
66
67    pub fn all() -> Self {
68        Self {
69            select: true,
70            mutation: true,
71        }
72    }
73
74    pub fn enabled_for(self, operation: SqlLogOperation) -> bool {
75        if operation.is_select() {
76            self.select
77        } else {
78            self.mutation
79        }
80    }
81}
82
83#[derive(Debug, Clone, PartialEq)]
84pub struct SqlLogEntry {
85    pub operation: SqlLogOperation,
86    pub sql: String,
87    pub params: Vec<Value>,
88    pub debug_sql: String,
89    pub pretty_sql: String,
90    pub started_at: SystemTime,
91    pub ended_at: SystemTime,
92    pub elapsed: Duration,
93    pub result_count: Option<usize>,
94    pub result_type: Option<String>,
95    pub affected_rows: Option<u64>,
96    pub result_summary: String,
97}
98
99#[derive(Debug, Clone, PartialEq)]
100pub struct UnifiedLogEntry {
101    pub timestamp: SystemTime,
102    pub user_identifier: Option<String>,
103    pub trace_chain: Vec<teaql_core::TraceNode>,
104    pub payload: LogPayload,
105}
106
107#[derive(Debug, Clone, PartialEq)]
108pub enum LogPayload {
109    Sql(SqlLogEntry),
110    Info(InfoLogEntry),
111}
112
113#[derive(Debug, Clone, PartialEq)]
114pub struct InfoLogEntry {
115    pub message: String,
116}
117
118#[derive(Clone, Default)]
119pub struct UnifiedLogBuffer {
120    pub entries: std::sync::Arc<Mutex<Vec<UnifiedLogEntry>>>,
121}
122
123pub trait SchemaProvider: Send + Sync {
124    fn ensure_schema<'a>(
125        &'a self,
126        ctx: &'a UserContext,
127    ) -> Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>>;
128}
129
130pub struct UserContext {
131    pub(crate) metadata: Option<Box<dyn MetadataStore>>,
132    pub(crate) repository_registry: Option<Box<dyn RepositoryRegistry>>,
133    pub(crate) repository_behavior_registry: Option<Box<dyn RepositoryBehaviorRegistry>>,
134    pub(crate) request_policy: Option<Box<dyn RequestPolicy>>,
135    pub(crate) checker_registry: Option<Box<dyn CheckerRegistry>>,
136    pub(crate) event_sink: Option<Box<dyn RawAuditEventSink>>,
137    pub(crate) custom_event_sink: Option<Box<dyn crate::SafeAuditEventSink>>,
138    pub(crate) internal_id_generator: Option<Box<dyn InternalIdGenerator>>,
139    schema_provider: Option<Box<dyn SchemaProvider>>,
140    language: Language,
141    typed_resources: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
142    named_resources: BTreeMap<String, Box<dyn Any + Send + Sync>>,
143    locals: BTreeMap<String, Value>,
144    pub(crate) initial_graphs: Vec<GraphNode>,
145    entity_root: EntityRoot,
146    sql_log_options: SqlLogOptions,
147    sql_log_entries: Mutex<Vec<SqlLogEntry>>,
148    user_identifier: Option<String>,
149    timezone: Option<String>,
150    trace_id: String,
151}
152
153impl Default for UserContext {
154    fn default() -> Self {
155        let pid = std::process::id();
156        let thread_id_str = format!("{:?}", std::thread::current().id());
157        let numeric_thread_id = thread_id_str
158            .strip_prefix("ThreadId(")
159            .and_then(|s| s.strip_suffix(")"))
160            .unwrap_or(&thread_id_str);
161        let os_user = std::env::var("USER")
162            .or_else(|_| std::env::var("USERNAME"))
163            .unwrap_or_else(|_| "main".to_owned());
164        let user_id = format!("{os_user}@pid-{pid}.tid-{numeric_thread_id}");
165        Self {
166            metadata: None,
167            repository_registry: None,
168            repository_behavior_registry: None,
169            request_policy: None,
170            checker_registry: None,
171            event_sink: None,
172            custom_event_sink: None,
173            internal_id_generator: None,
174            schema_provider: None,
175            language: Language::default(),
176            typed_resources: HashMap::new(),
177            named_resources: BTreeMap::new(),
178            locals: BTreeMap::new(),
179            initial_graphs: Vec::new(),
180            entity_root: EntityRoot::default(),
181            sql_log_options: SqlLogOptions::all(),
182            sql_log_entries: Mutex::new(Vec::new()),
183            user_identifier: Some(user_id),
184            timezone: Some("UTC".to_owned()),
185            trace_id: format!("req-{pid}-{numeric_thread_id}-{:x}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_micros()),
186        }
187    }
188}
189
190#[async_trait::async_trait]
191pub trait DataStore: Send + Sync + 'static {
192    async fn get(&self, key: &str) -> Option<Value>;
193    async fn put(&self, key: &str, value: Value, timeout_seconds: Option<u64>);
194    async fn remove(&self, key: &str);
195}
196
197#[derive(Default)]
198pub struct InMemoryDataStore {
199    cache: std::sync::RwLock<HashMap<String, (Value, Option<std::time::Instant>)>>,
200}
201
202#[async_trait::async_trait]
203impl DataStore for InMemoryDataStore {
204    async fn get(&self, key: &str) -> Option<Value> {
205        let lock = self.cache.read().unwrap();
206        if let Some((val, expires_at)) = lock.get(key) {
207            if let Some(exp) = expires_at {
208                if std::time::Instant::now() > *exp {
209                    return None;
210                }
211            }
212            return Some(val.clone());
213        }
214        None
215    }
216
217    async fn put(&self, key: &str, value: Value, timeout_seconds: Option<u64>) {
218        let mut lock = self.cache.write().unwrap();
219        let expires_at = timeout_seconds.map(|secs| std::time::Instant::now() + std::time::Duration::from_secs(secs));
220        lock.insert(key.to_string(), (value, expires_at));
221    }
222
223    async fn remove(&self, key: &str) {
224        let mut lock = self.cache.write().unwrap();
225        lock.remove(key);
226    }
227}
228
229impl UserContext {
230    pub fn new() -> Self {
231        Self::default()
232    }
233
234    pub fn user_identifier(&self) -> Option<&str> {
235        self.user_identifier.as_deref()
236    }
237
238    pub fn set_user_identifier(&mut self, user_identifier: impl Into<String>) {
239        self.user_identifier = Some(user_identifier.into());
240    }
241
242    pub fn with_user_identifier(mut self, user_identifier: impl Into<String>) -> Self {
243        self.user_identifier = Some(user_identifier.into());
244        self
245    }
246
247    pub fn set_user_identifier_option(&mut self, user_identifier: Option<String>) {
248        self.user_identifier = user_identifier;
249    }
250
251    pub fn with_user_identifier_option(mut self, user_identifier: Option<String>) -> Self {
252        self.user_identifier = user_identifier;
253        self
254    }
255
256    pub fn timezone(&self) -> Option<&str> {
257        self.timezone.as_deref()
258    }
259
260    pub fn set_timezone(&mut self, timezone: impl Into<String>) {
261        self.timezone = Some(timezone.into());
262    }
263
264    pub fn with_timezone(mut self, timezone: impl Into<String>) -> Self {
265        self.timezone = Some(timezone.into());
266        self
267    }
268
269    pub fn trace_id(&self) -> &str {
270        &self.trace_id
271    }
272
273    pub fn set_trace_id(&mut self, trace_id: impl Into<String>) {
274        self.trace_id = trace_id.into();
275    }
276
277    pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
278        self.trace_id = trace_id.into();
279        self
280    }
281
282    pub fn with_module(mut self, module: crate::RuntimeModule) -> Self {
283        module.apply_to(&mut self);
284        self
285    }
286
287    pub fn entity_root(&self) -> EntityRoot {
288        self.entity_root.clone()
289    }
290
291    pub fn initial_graphs(&self) -> &[GraphNode] {
292        &self.initial_graphs
293    }
294
295    pub fn set_initial_graphs(&mut self, graphs: Vec<GraphNode>) {
296        self.initial_graphs = graphs;
297    }
298
299    pub fn with_metadata(mut self, metadata: impl MetadataStore + 'static) -> Self {
300        self.metadata = Some(Box::new(metadata));
301        self
302    }
303
304    pub fn set_metadata(&mut self, metadata: impl MetadataStore + 'static) {
305        self.metadata = Some(Box::new(metadata));
306    }
307
308    pub fn with_repository_registry(mut self, registry: impl RepositoryRegistry + 'static) -> Self {
309        self.repository_registry = Some(Box::new(registry));
310        self
311    }
312
313    pub fn set_repository_registry(&mut self, registry: impl RepositoryRegistry + 'static) {
314        self.repository_registry = Some(Box::new(registry));
315    }
316
317    pub fn with_repository_behavior_registry(
318        mut self,
319        registry: impl RepositoryBehaviorRegistry + 'static,
320    ) -> Self {
321        self.repository_behavior_registry = Some(Box::new(registry));
322        self
323    }
324
325    pub fn set_repository_behavior_registry(
326        &mut self,
327        registry: impl RepositoryBehaviorRegistry + 'static,
328    ) {
329        self.repository_behavior_registry = Some(Box::new(registry));
330    }
331
332    pub fn with_request_policy(mut self, policy: impl RequestPolicy + 'static) -> Self {
333        self.request_policy = Some(Box::new(policy));
334        self
335    }
336
337    pub fn set_request_policy(&mut self, policy: impl RequestPolicy + 'static) {
338        self.request_policy = Some(Box::new(policy));
339    }
340
341    pub fn clear_request_policy(&mut self) {
342        self.request_policy = None;
343    }
344
345    pub fn with_checker_registry(mut self, registry: impl CheckerRegistry + 'static) -> Self {
346        self.checker_registry = Some(Box::new(registry));
347        self
348    }
349
350    pub fn set_checker_registry(&mut self, registry: impl CheckerRegistry + 'static) {
351        self.checker_registry = Some(Box::new(registry));
352    }
353
354    pub(crate) fn with_event_sink(mut self, sink: impl RawAuditEventSink + 'static) -> Self {
355        self.event_sink = Some(Box::new(sink));
356        self
357    }
358
359    pub(crate) fn set_event_sink(&mut self, sink: impl RawAuditEventSink + 'static) {
360        self.event_sink = Some(Box::new(sink));
361    }
362
363    pub fn with_custom_event_sink(mut self, sink: impl crate::SafeAuditEventSink + 'static) -> Self {
364        self.custom_event_sink = Some(Box::new(sink));
365        self
366    }
367
368    pub fn set_custom_event_sink(&mut self, sink: impl crate::SafeAuditEventSink + 'static) {
369        self.custom_event_sink = Some(Box::new(sink));
370    }
371
372    pub fn with_internal_id_generator(
373        mut self,
374        generator: impl InternalIdGenerator + 'static,
375    ) -> Self {
376        self.internal_id_generator = Some(Box::new(generator));
377        self
378    }
379
380    pub fn set_internal_id_generator(&mut self, generator: impl InternalIdGenerator + 'static) {
381        self.internal_id_generator = Some(Box::new(generator));
382    }
383
384    pub fn with_schema_provider(mut self, provider: impl SchemaProvider + 'static) -> Self {
385        self.schema_provider = Some(Box::new(provider));
386        self
387    }
388
389    pub fn set_schema_provider(&mut self, provider: impl SchemaProvider + 'static) {
390        self.schema_provider = Some(Box::new(provider));
391    }
392
393    pub async fn ensure_schema(&self) -> Result<(), RuntimeError> {
394        let provider = self
395            .schema_provider
396            .as_ref()
397            .ok_or_else(|| RuntimeError::Schema("missing schema provider".to_owned()))?;
398        provider.ensure_schema(self).await
399    }
400
401    pub fn with_language(mut self, language: Language) -> Self {
402        self.language = language;
403        self
404    }
405
406    pub fn set_language(&mut self, language: Language) {
407        self.language = language;
408    }
409
410    pub fn with_sql_log_options(mut self, options: SqlLogOptions) -> Self {
411        self.sql_log_options = options;
412        self
413    }
414
415    pub fn set_sql_log_options(&mut self, options: SqlLogOptions) {
416        self.sql_log_options = options;
417    }
418
419    pub fn enable_select_sql_log(&mut self) {
420        self.sql_log_options.select = true;
421    }
422
423    pub fn enable_mutation_sql_log(&mut self) {
424        self.sql_log_options.mutation = true;
425    }
426
427    pub fn enable_all_sql_log(&mut self) {
428        self.sql_log_options = SqlLogOptions::all();
429    }
430
431    pub fn disable_sql_log(&mut self) {
432        self.sql_log_options = SqlLogOptions::disabled();
433        self.clear_sql_logs();
434    }
435
436    pub fn sql_log_options(&self) -> SqlLogOptions {
437        self.sql_log_options
438    }
439
440    pub fn sql_logs(&self) -> Vec<SqlLogEntry> {
441        self.sql_log_entries
442            .lock()
443            .map(|entries| entries.clone())
444            .unwrap_or_default()
445    }
446
447    pub fn clear_sql_logs(&self) {
448        if let Ok(mut entries) = self.sql_log_entries.lock() {
449            entries.clear();
450        }
451    }
452
453    pub(crate) fn record_sql_log(
454        &self,
455        operation: SqlLogOperation,
456        query: &CompiledQuery,
457        database_kind: DatabaseKind,
458        started_at: SystemTime,
459        ended_at: SystemTime,
460        elapsed: Duration,
461        result_count: Option<usize>,
462        result_type: Option<String>,
463        affected_rows: Option<u64>,
464        trace_chain: Vec<teaql_core::TraceNode>,
465    ) {
466        if !self.sql_log_options.enabled_for(operation) {
467            return;
468        }
469        let debug_sql = query.debug_sql(database_kind);
470        let result_summary = sql_result_summary(
471            operation,
472            result_count,
473            result_type.as_deref(),
474            affected_rows,
475            &debug_sql,
476        );
477
478        let sql_log_entry = SqlLogEntry {
479            operation,
480            sql: query.sql.clone(),
481            params: query.params.clone(),
482            pretty_sql: pretty_sql(&debug_sql),
483            debug_sql: debug_sql.clone(),
484            started_at,
485            ended_at,
486            elapsed,
487            result_summary: result_summary.clone(),
488            result_count,
489            result_type,
490            affected_rows,
491        };
492
493        if let Ok(mut entries) = self.sql_log_entries.lock() {
494            // Keep sql_log_entries backwards-compatible for now if needed,
495            // wait, we modified SqlLogEntry. We can just push it directly since we removed comment.
496            // Wait, we need to push a cloned SqlLogEntry since it doesn't have comment.
497            entries.push(sql_log_entry.clone());
498        }
499
500        if let Some(buf) = self.get_resource::<UnifiedLogBuffer>() {
501            if let Ok(mut entries) = buf.entries.lock() {
502                entries.push(UnifiedLogEntry {
503                    timestamp: started_at,
504                    user_identifier: self.user_identifier.clone(),
505                    trace_chain,
506                    payload: LogPayload::Sql(sql_log_entry),
507                });
508            }
509        }
510    }
511
512    pub(crate) fn record_metadata_log(&self, metadata: &teaql_data_service::ExecutionMetadata) {
513        if let Some(debug_sql) = &metadata.debug_query {
514            let sql_log_entry = SqlLogEntry {
515                operation: match metadata.operation {
516                    teaql_data_service::DataServiceOperation::Query => SqlLogOperation::Select,
517                    teaql_data_service::DataServiceOperation::Insert => SqlLogOperation::Insert,
518                    teaql_data_service::DataServiceOperation::Update => SqlLogOperation::Update,
519                    teaql_data_service::DataServiceOperation::Delete => SqlLogOperation::Delete,
520                    teaql_data_service::DataServiceOperation::Recover => SqlLogOperation::Update, // Approximate
521                    teaql_data_service::DataServiceOperation::Batch => SqlLogOperation::Update,
522                    teaql_data_service::DataServiceOperation::Schema => SqlLogOperation::Update,
523                },
524                sql: String::new(), // Not available in metadata
525                params: Vec::new(), // Not available in metadata
526                pretty_sql: pretty_sql(debug_sql),
527                debug_sql: debug_sql.clone(),
528                started_at: metadata.started_at,
529                ended_at: metadata.ended_at,
530                elapsed: metadata.ended_at.duration_since(metadata.started_at).unwrap_or_default(),
531                result_count: metadata.result_count,
532                result_type: None, // Not directly available
533                affected_rows: metadata.affected_rows,
534                result_summary: String::new(), // We can synthesize this if needed, or leave it empty/basic
535            };
536
537            // synthesize a summary for the log
538            let mut summary = String::new();
539            if let Some(c) = metadata.result_count {
540                summary = format!("{} rows returned", c);
541            } else if let Some(a) = metadata.affected_rows {
542                summary = format!("{} rows affected", a);
543            }
544
545            let mut final_entry = sql_log_entry;
546            final_entry.result_summary = summary;
547
548            if let Ok(mut entries) = self.sql_log_entries.lock() {
549                entries.push(final_entry.clone());
550            }
551
552            if let Some(buf) = self.get_resource::<UnifiedLogBuffer>() {
553                if let Ok(mut entries) = buf.entries.lock() {
554                    entries.push(UnifiedLogEntry {
555                        timestamp: metadata.started_at,
556                        user_identifier: self.user_identifier.clone(),
557                        trace_chain: metadata.trace_chain.clone(),
558                        payload: LogPayload::Sql(final_entry),
559                    });
560                }
561            }
562        }
563    }
564
565    pub fn language(&self) -> Language {
566        self.language
567    }
568
569    pub fn set_language_code(&mut self, code: &str) -> Result<(), RuntimeError> {
570        let Some(language) = Language::from_code(code) else {
571            return Err(RuntimeError::Language(format!(
572                "unsupported language code: {code}"
573            )));
574        };
575        self.language = language;
576        Ok(())
577    }
578
579    pub fn generate_id(&self, entity: &str) -> Result<Option<u64>, RuntimeError> {
580        self.internal_id_generator
581            .as_ref()
582            .map(|generator| generator.generate_id(entity))
583            .transpose()
584    }
585
586    pub fn next_id(&self, entity: &str) -> Result<u64, RuntimeError> {
587        match self.generate_id(entity)? {
588            Some(id) => Ok(id),
589            None => local_id_generator().generate_id(entity),
590        }
591    }
592
593    pub fn entity(&self, name: &str) -> Option<&EntityDescriptor> {
594        self.metadata
595            .as_ref()
596            .and_then(|metadata| metadata.entity(name))
597    }
598
599    pub fn all_entities(&self) -> Vec<&EntityDescriptor> {
600        self.metadata
601            .as_ref()
602            .map(|metadata| metadata.all_entities())
603            .unwrap_or_default()
604    }
605
606    pub fn require_entity(&self, name: &str) -> Result<&EntityDescriptor, RuntimeError> {
607        self.entity(name)
608            .ok_or_else(|| RuntimeError::MissingEntity(name.to_owned()))
609    }
610
611    pub fn insert_resource<T>(&mut self, resource: T)
612    where
613        T: Send + Sync + 'static,
614    {
615        self.typed_resources
616            .insert(TypeId::of::<T>(), Box::new(resource));
617    }
618
619    pub fn get_resource<T>(&self) -> Option<&T>
620    where
621        T: Send + Sync + 'static,
622    {
623        self.typed_resources
624            .get(&TypeId::of::<T>())
625            .and_then(|value| value.downcast_ref::<T>())
626    }
627
628    pub fn require_resource<T>(&self) -> Result<&T, ContextError>
629    where
630        T: Send + Sync + 'static,
631    {
632        self.get_resource::<T>()
633            .ok_or(ContextError::MissingTypedResource(
634                std::any::type_name::<T>(),
635            ))
636    }
637
638    pub fn insert_named_resource<T>(&mut self, name: impl Into<String>, resource: T)
639    where
640        T: Send + Sync + 'static,
641    {
642        self.named_resources.insert(name.into(), Box::new(resource));
643    }
644
645    pub fn get_named_resource<T>(&self, name: &str) -> Option<&T>
646    where
647        T: Send + Sync + 'static,
648    {
649        self.named_resources
650            .get(name)
651            .and_then(|value| value.downcast_ref::<T>())
652    }
653
654    pub fn require_named_resource<T>(&self, name: &str) -> Result<&T, ContextError>
655    where
656        T: Send + Sync + 'static,
657    {
658        self.get_named_resource::<T>(name)
659            .ok_or_else(|| ContextError::MissingResource(name.to_owned()))
660    }
661
662    pub fn put_local(&mut self, key: impl Into<String>, value: impl Into<Value>) {
663        self.locals.insert(key.into(), value.into());
664    }
665
666    pub fn local(&self, key: &str) -> Option<&Value> {
667        self.locals.get(key)
668    }
669
670    pub fn remove_local(&mut self, key: &str) -> Option<Value> {
671        self.locals.remove(key)
672    }
673
674    pub fn has_repository(&self, entity: &str) -> bool {
675        let in_registry = self
676            .repository_registry
677            .as_ref()
678            .map(|registry| registry.contains(entity))
679            .unwrap_or(false);
680        in_registry || self.entity(entity).is_some()
681    }
682
683    pub fn repository_behavior(
684        &self,
685        entity: &str,
686    ) -> Option<std::sync::Arc<dyn RepositoryBehavior>> {
687        self.repository_behavior_registry
688            .as_ref()
689            .and_then(|registry| registry.behavior(entity))
690    }
691
692    pub fn has_checker(&self, entity: &str) -> bool {
693        self.checker_registry
694            .as_ref()
695            .and_then(|registry| registry.checker(entity))
696            .is_some()
697    }
698
699    pub fn check_and_fix_record(
700        &self,
701        entity: &str,
702        record: &mut Record,
703    ) -> Result<(), RuntimeError> {
704        self.check_and_fix_record_at(entity, record, &ObjectLocation::root())
705    }
706
707    pub fn check_and_fix_record_at(
708        &self,
709        entity: &str,
710        record: &mut Record,
711        location: &ObjectLocation,
712    ) -> Result<(), RuntimeError> {
713        let Some(checker) = self
714            .checker_registry
715            .as_ref()
716            .and_then(|registry| registry.checker(entity))
717        else {
718            return Ok(());
719        };
720        let mut results = CheckResults::new();
721        checker.check_and_fix(self, record, location, &mut results);
722        if results.is_empty() {
723            Ok(())
724        } else {
725            self.translate_check_results(&mut results);
726            Err(RuntimeError::Check(results))
727        }
728    }
729
730    pub fn translate_check_results(&self, results: &mut CheckResults) {
731        for result in results {
732            result.message = Some(translate_check_result(self.language, result));
733        }
734    }
735
736    pub fn send_event(&self, event: RawAuditEvent) -> Result<(), RuntimeError> {
737        if let Some(sink) = self.event_sink.as_ref() {
738            sink.on_event(self, &event)?;
739        }
740        if let Some(sink) = self.custom_event_sink.as_ref() {
741            let (mask_fields, max_len) = if let Some(metadata) = &self.metadata {
742                if let Some(desc) = metadata.entity(&event.entity) {
743                    (desc.audit_mask_fields.clone(), desc.audit_value_max_len)
744                } else {
745                    (vec![], None)
746                }
747            } else {
748                (vec![], None)
749            };
750            
751            let safe_event = event.build_safe_event(&mask_fields, max_len);
752            sink.on_safe_event(self, &safe_event)?;
753        }
754        Ok(())
755    }
756
757    pub async fn commit_changes<E>(&self) -> Result<(), RepositoryError<E::Error>>
758    where
759        E: teaql_data_service::MutationExecutor + Send + Sync + 'static,
760    {
761        let executor = self.require_resource::<E>().map_err(|err| {
762            RepositoryError::Runtime(RuntimeError::Graph(format!(
763                "cannot commit changes without executor: {err}"
764            )))
765        })?;
766        let change_set = self.entity_root.current_change_set();
767
768        for (key, changes) in change_set.changes() {
769            if changes.is_empty() {
770                continue;
771            }
772            let _entity = self
773                .require_entity(&key.entity)
774                .map_err(RepositoryError::Runtime)?;
775            let mut command = UpdateCommand::new(&key.entity, key.id.clone());
776            for (field, value) in changes {
777                command = command.value(field.clone(), value.clone());
778            }
779            let request = teaql_data_service::MutationRequest::Update(command);
780            executor
781                .mutate(request).await
782                .map_err(RepositoryError::Executor)?;
783        }
784
785        self.entity_root.clear_current_change_set();
786        Ok(())
787    }
788
789    pub async fn get_in_store(&self, key: &str) -> Option<Value> {
790        if let Some(store) = self.get_resource::<Box<dyn DataStore>>() {
791            store.get(key).await
792        } else {
793            None
794        }
795    }
796
797    pub async fn put_in_store(&self, key: &str, value: impl Into<Value>, timeout_seconds: Option<u64>) {
798        if let Some(store) = self.get_resource::<Box<dyn DataStore>>() {
799            store.put(key, value.into(), timeout_seconds).await;
800        }
801    }
802
803    pub async fn clear_in_store(&self, key: &str) {
804        if let Some(store) = self.get_resource::<Box<dyn DataStore>>() {
805            store.remove(key).await;
806        }
807    }
808}
809
810fn extract_id_from_sql(sql: &str) -> Option<String> {
811    let sql_lower = sql.to_lowercase();
812    let where_idx = sql_lower.find("where")?;
813    let where_clause = &sql_lower[where_idx + 5..];
814    
815    let bytes = where_clause.as_bytes();
816    let mut i = 0;
817    while i < bytes.len() {
818        if i + 1 < bytes.len() && &bytes[i..i+2] == b"id" {
819            // Check boundary before
820            let prev_ok = if i == 0 {
821                true
822            } else {
823                let prev_char = bytes[i - 1] as char;
824                !prev_char.is_ascii_alphanumeric() && prev_char != '_' && prev_char != '.'
825            };
826            // Check boundary after
827            let next_ok = if i + 2 == bytes.len() {
828                true
829            } else {
830                let next_char = bytes[i + 2] as char;
831                !next_char.is_ascii_alphanumeric() && next_char != '_'
832            };
833            
834            if prev_ok && next_ok {
835                // Found the standalone "id" word!
836                // Now look for "=" after it
837                let mut j = i + 2;
838                while j < bytes.len() && (bytes[j] as char).is_whitespace() {
839                    j += 1;
840                }
841                if j < bytes.len() && bytes[j] == b'=' {
842                    j += 1;
843                    while j < bytes.len() && (bytes[j] as char).is_whitespace() {
844                        j += 1;
845                      }
846                      // Now extract the value
847                      let mut val_str = String::new();
848                      if j < bytes.len() && bytes[j] == b'\'' {
849                          j += 1; // consume single quote
850                          while j < bytes.len() && bytes[j] != b'\'' {
851                              val_str.push(bytes[j] as char);
852                              j += 1;
853                          }
854                          return Some(val_str);
855                      } else {
856                          while j < bytes.len() {
857                              let c = bytes[j] as char;
858                              if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
859                                  val_str.push(c);
860                                  j += 1;
861                              } else {
862                                  break;
863                              }
864                          }
865                          if !val_str.is_empty() {
866                              return Some(val_str);
867                          }
868                      }
869                }
870            }
871        }
872        i += 1;
873    }
874    None
875}
876
877fn sql_result_summary(
878    operation: SqlLogOperation,
879    result_count: Option<usize>,
880    result_type: Option<&str>,
881    affected_rows: Option<u64>,
882    debug_sql: &str,
883) -> String {
884    match operation {
885        SqlLogOperation::Select => {
886            let count = result_count.unwrap_or(0);
887            if count == 0 {
888                "MISS".to_owned()
889            } else if count > 1 {
890                match result_type {
891                    Some(result_type) => format!("{count}*{result_type}"),
892                    None => format!("{count}*rows"),
893                }
894            } else {
895                match result_type {
896                    Some(result_type) => {
897                        if let Some(id) = extract_id_from_sql(debug_sql) {
898                            format!("{result_type}({id})")
899                        } else {
900                            result_type.to_owned()
901                        }
902                    }
903                    None => "row".to_owned(),
904                }
905            }
906        }
907        _ => {
908            let affected = affected_rows.unwrap_or(0);
909            format!("{affected} UPDATED")
910        }
911    }
912}
913
914fn pretty_sql(sql: &str) -> String {
915    let mut pretty = sql.to_owned();
916    for keyword in [
917        " FROM ",
918        " WHERE ",
919        " GROUP BY ",
920        " HAVING ",
921        " ORDER BY ",
922        " LIMIT ",
923        " OFFSET ",
924        " RETURNING ",
925    ] {
926        pretty = pretty.replace(keyword, &format!("\n{}", keyword.trim_start()));
927    }
928    pretty
929        .replace(" AND ", "\n  AND ")
930        .replace(" OR ", "\n  OR ")
931}
932
933