1use std::any::{Any, TypeId};
2use std::collections::{BTreeMap, HashMap};
3use std::future::Future;
4
5use std::pin::Pin;
6use std::sync::Mutex;
7use std::time::{Duration, SystemTime};
8
9use teaql_core::{EntityDescriptor, Record, UpdateCommand, Value};
10use teaql_sql::{CompiledQuery, DatabaseKind};
11
12use crate::{
13 CheckResults, CheckerRegistry, ContextError, RawAuditEvent, RawAuditEventSink, GraphNode,
14 InternalIdGenerator, Language, MetadataStore, ObjectLocation, RepositoryBehavior,
15 RepositoryBehaviorRegistry, RepositoryRegistry, RequestPolicy, RuntimeError,
16 local_id_generator, translate_check_result,
17};
18use crate::{EntityRoot, RepositoryError};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum SqlLogOperation {
22 Select,
23 Insert,
24 Update,
25 Delete,
26 Recover,
27}
28
29impl SqlLogOperation {
30 pub fn is_select(self) -> bool {
31 matches!(self, Self::Select)
32 }
33
34 pub fn is_mutation(self) -> bool {
35 !self.is_select()
36 }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
40pub struct SqlLogOptions {
41 pub select: bool,
42 pub mutation: bool,
43}
44
45impl SqlLogOptions {
46 pub fn disabled() -> Self {
47 Self {
48 select: false,
49 mutation: false,
50 }
51 }
52
53 pub fn select_only() -> Self {
54 Self {
55 select: true,
56 mutation: false,
57 }
58 }
59
60 pub fn mutation_only() -> Self {
61 Self {
62 select: false,
63 mutation: true,
64 }
65 }
66
67 pub fn all() -> Self {
68 Self {
69 select: true,
70 mutation: true,
71 }
72 }
73
74 pub fn enabled_for(self, operation: SqlLogOperation) -> bool {
75 if operation.is_select() {
76 self.select
77 } else {
78 self.mutation
79 }
80 }
81}
82
83#[derive(Debug, Clone, PartialEq)]
84pub struct SqlLogEntry {
85 pub operation: SqlLogOperation,
86 pub sql: String,
87 pub params: Vec<Value>,
88 pub debug_sql: String,
89 pub pretty_sql: String,
90 pub started_at: SystemTime,
91 pub ended_at: SystemTime,
92 pub elapsed: Duration,
93 pub result_count: Option<usize>,
94 pub result_type: Option<String>,
95 pub affected_rows: Option<u64>,
96 pub result_summary: String,
97}
98
99#[derive(Debug, Clone, PartialEq)]
100pub struct UnifiedLogEntry {
101 pub timestamp: SystemTime,
102 pub user_identifier: Option<String>,
103 pub trace_chain: Vec<teaql_core::TraceNode>,
104 pub payload: LogPayload,
105}
106
107#[derive(Debug, Clone, PartialEq)]
108pub enum LogPayload {
109 Sql(SqlLogEntry),
110 Info(InfoLogEntry),
111}
112
113#[derive(Debug, Clone, PartialEq)]
114pub struct InfoLogEntry {
115 pub message: String,
116}
117
118#[derive(Clone, Default)]
119pub struct UnifiedLogBuffer {
120 pub entries: std::sync::Arc<Mutex<Vec<UnifiedLogEntry>>>,
121}
122
123pub trait SchemaProvider: Send + Sync {
124 fn ensure_schema<'a>(
125 &'a self,
126 ctx: &'a UserContext,
127 ) -> Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>>;
128}
129
130pub struct UserContext {
131 pub(crate) metadata: Option<Box<dyn MetadataStore>>,
132 pub(crate) repository_registry: Option<Box<dyn RepositoryRegistry>>,
133 pub(crate) repository_behavior_registry: Option<Box<dyn RepositoryBehaviorRegistry>>,
134 pub(crate) request_policy: Option<Box<dyn RequestPolicy>>,
135 pub(crate) checker_registry: Option<Box<dyn CheckerRegistry>>,
136 pub(crate) event_sink: Option<Box<dyn RawAuditEventSink>>,
137 pub(crate) custom_event_sink: Option<Box<dyn crate::SafeAuditEventSink>>,
138 pub(crate) internal_id_generator: Option<Box<dyn InternalIdGenerator>>,
139 schema_provider: Option<Box<dyn SchemaProvider>>,
140 language: Language,
141 typed_resources: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
142 named_resources: BTreeMap<String, Box<dyn Any + Send + Sync>>,
143 locals: BTreeMap<String, Value>,
144 pub(crate) initial_graphs: Vec<GraphNode>,
145 entity_root: EntityRoot,
146 sql_log_options: SqlLogOptions,
147 sql_log_entries: Mutex<Vec<SqlLogEntry>>,
148 user_identifier: Option<String>,
149 timezone: Option<String>,
150 trace_id: String,
151}
152
153impl Default for UserContext {
154 fn default() -> Self {
155 let pid = std::process::id();
156 let thread_id_str = format!("{:?}", std::thread::current().id());
157 let numeric_thread_id = thread_id_str
158 .strip_prefix("ThreadId(")
159 .and_then(|s| s.strip_suffix(")"))
160 .unwrap_or(&thread_id_str);
161 let os_user = std::env::var("USER")
162 .or_else(|_| std::env::var("USERNAME"))
163 .unwrap_or_else(|_| "main".to_owned());
164 let user_id = format!("{os_user}@pid-{pid}.tid-{numeric_thread_id}");
165 Self {
166 metadata: None,
167 repository_registry: None,
168 repository_behavior_registry: None,
169 request_policy: None,
170 checker_registry: None,
171 event_sink: None,
172 custom_event_sink: None,
173 internal_id_generator: None,
174 schema_provider: None,
175 language: Language::default(),
176 typed_resources: HashMap::new(),
177 named_resources: BTreeMap::new(),
178 locals: BTreeMap::new(),
179 initial_graphs: Vec::new(),
180 entity_root: EntityRoot::default(),
181 sql_log_options: SqlLogOptions::all(),
182 sql_log_entries: Mutex::new(Vec::new()),
183 user_identifier: Some(user_id),
184 timezone: Some("UTC".to_owned()),
185 trace_id: format!("req-{pid}-{numeric_thread_id}-{:x}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_micros()),
186 }
187 }
188}
189
190#[async_trait::async_trait]
191pub trait DataStore: Send + Sync + 'static {
192 async fn get(&self, key: &str) -> Option<Value>;
193 async fn put(&self, key: &str, value: Value, timeout_seconds: Option<u64>);
194 async fn remove(&self, key: &str);
195}
196
197#[derive(Default)]
198pub struct InMemoryDataStore {
199 cache: std::sync::RwLock<HashMap<String, (Value, Option<std::time::Instant>)>>,
200}
201
202#[async_trait::async_trait]
203impl DataStore for InMemoryDataStore {
204 async fn get(&self, key: &str) -> Option<Value> {
205 let lock = self.cache.read().unwrap();
206 if let Some((val, expires_at)) = lock.get(key) {
207 if let Some(exp) = expires_at {
208 if std::time::Instant::now() > *exp {
209 return None;
210 }
211 }
212 return Some(val.clone());
213 }
214 None
215 }
216
217 async fn put(&self, key: &str, value: Value, timeout_seconds: Option<u64>) {
218 let mut lock = self.cache.write().unwrap();
219 let expires_at = timeout_seconds.map(|secs| std::time::Instant::now() + std::time::Duration::from_secs(secs));
220 lock.insert(key.to_string(), (value, expires_at));
221 }
222
223 async fn remove(&self, key: &str) {
224 let mut lock = self.cache.write().unwrap();
225 lock.remove(key);
226 }
227}
228
229impl UserContext {
230 pub fn new() -> Self {
231 Self::default()
232 }
233
234 pub fn user_identifier(&self) -> Option<&str> {
235 self.user_identifier.as_deref()
236 }
237
238 pub fn set_user_identifier(&mut self, user_identifier: impl Into<String>) {
239 self.user_identifier = Some(user_identifier.into());
240 }
241
242 pub fn with_user_identifier(mut self, user_identifier: impl Into<String>) -> Self {
243 self.user_identifier = Some(user_identifier.into());
244 self
245 }
246
247 pub fn set_user_identifier_option(&mut self, user_identifier: Option<String>) {
248 self.user_identifier = user_identifier;
249 }
250
251 pub fn with_user_identifier_option(mut self, user_identifier: Option<String>) -> Self {
252 self.user_identifier = user_identifier;
253 self
254 }
255
256 pub fn timezone(&self) -> Option<&str> {
257 self.timezone.as_deref()
258 }
259
260 pub fn set_timezone(&mut self, timezone: impl Into<String>) {
261 self.timezone = Some(timezone.into());
262 }
263
264 pub fn with_timezone(mut self, timezone: impl Into<String>) -> Self {
265 self.timezone = Some(timezone.into());
266 self
267 }
268
269 pub fn trace_id(&self) -> &str {
270 &self.trace_id
271 }
272
273 pub fn set_trace_id(&mut self, trace_id: impl Into<String>) {
274 self.trace_id = trace_id.into();
275 }
276
277 pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
278 self.trace_id = trace_id.into();
279 self
280 }
281
282 pub fn with_module(mut self, module: crate::RuntimeModule) -> Self {
283 module.apply_to(&mut self);
284 self
285 }
286
287 pub fn entity_root(&self) -> EntityRoot {
288 self.entity_root.clone()
289 }
290
291 pub fn initial_graphs(&self) -> &[GraphNode] {
292 &self.initial_graphs
293 }
294
295 pub fn set_initial_graphs(&mut self, graphs: Vec<GraphNode>) {
296 self.initial_graphs = graphs;
297 }
298
299 pub fn with_metadata(mut self, metadata: impl MetadataStore + 'static) -> Self {
300 self.metadata = Some(Box::new(metadata));
301 self
302 }
303
304 pub fn set_metadata(&mut self, metadata: impl MetadataStore + 'static) {
305 self.metadata = Some(Box::new(metadata));
306 }
307
308 pub fn with_repository_registry(mut self, registry: impl RepositoryRegistry + 'static) -> Self {
309 self.repository_registry = Some(Box::new(registry));
310 self
311 }
312
313 pub fn set_repository_registry(&mut self, registry: impl RepositoryRegistry + 'static) {
314 self.repository_registry = Some(Box::new(registry));
315 }
316
317 pub fn with_repository_behavior_registry(
318 mut self,
319 registry: impl RepositoryBehaviorRegistry + 'static,
320 ) -> Self {
321 self.repository_behavior_registry = Some(Box::new(registry));
322 self
323 }
324
325 pub fn set_repository_behavior_registry(
326 &mut self,
327 registry: impl RepositoryBehaviorRegistry + 'static,
328 ) {
329 self.repository_behavior_registry = Some(Box::new(registry));
330 }
331
332 pub fn with_request_policy(mut self, policy: impl RequestPolicy + 'static) -> Self {
333 self.request_policy = Some(Box::new(policy));
334 self
335 }
336
337 pub fn set_request_policy(&mut self, policy: impl RequestPolicy + 'static) {
338 self.request_policy = Some(Box::new(policy));
339 }
340
341 pub fn clear_request_policy(&mut self) {
342 self.request_policy = None;
343 }
344
345 pub fn with_checker_registry(mut self, registry: impl CheckerRegistry + 'static) -> Self {
346 self.checker_registry = Some(Box::new(registry));
347 self
348 }
349
350 pub fn set_checker_registry(&mut self, registry: impl CheckerRegistry + 'static) {
351 self.checker_registry = Some(Box::new(registry));
352 }
353
354 pub(crate) fn with_event_sink(mut self, sink: impl RawAuditEventSink + 'static) -> Self {
355 self.event_sink = Some(Box::new(sink));
356 self
357 }
358
359 pub(crate) fn set_event_sink(&mut self, sink: impl RawAuditEventSink + 'static) {
360 self.event_sink = Some(Box::new(sink));
361 }
362
363 pub fn with_custom_event_sink(mut self, sink: impl crate::SafeAuditEventSink + 'static) -> Self {
364 self.custom_event_sink = Some(Box::new(sink));
365 self
366 }
367
368 pub fn set_custom_event_sink(&mut self, sink: impl crate::SafeAuditEventSink + 'static) {
369 self.custom_event_sink = Some(Box::new(sink));
370 }
371
372 pub fn with_internal_id_generator(
373 mut self,
374 generator: impl InternalIdGenerator + 'static,
375 ) -> Self {
376 self.internal_id_generator = Some(Box::new(generator));
377 self
378 }
379
380 pub fn set_internal_id_generator(&mut self, generator: impl InternalIdGenerator + 'static) {
381 self.internal_id_generator = Some(Box::new(generator));
382 }
383
384 pub fn with_schema_provider(mut self, provider: impl SchemaProvider + 'static) -> Self {
385 self.schema_provider = Some(Box::new(provider));
386 self
387 }
388
389 pub fn set_schema_provider(&mut self, provider: impl SchemaProvider + 'static) {
390 self.schema_provider = Some(Box::new(provider));
391 }
392
393 pub async fn ensure_schema(&self) -> Result<(), RuntimeError> {
394 let provider = self
395 .schema_provider
396 .as_ref()
397 .ok_or_else(|| RuntimeError::Schema("missing schema provider".to_owned()))?;
398 provider.ensure_schema(self).await
399 }
400
401 pub fn with_language(mut self, language: Language) -> Self {
402 self.language = language;
403 self
404 }
405
406 pub fn set_language(&mut self, language: Language) {
407 self.language = language;
408 }
409
410 pub fn with_sql_log_options(mut self, options: SqlLogOptions) -> Self {
411 self.sql_log_options = options;
412 self
413 }
414
415 pub fn set_sql_log_options(&mut self, options: SqlLogOptions) {
416 self.sql_log_options = options;
417 }
418
419 pub fn enable_select_sql_log(&mut self) {
420 self.sql_log_options.select = true;
421 }
422
423 pub fn enable_mutation_sql_log(&mut self) {
424 self.sql_log_options.mutation = true;
425 }
426
427 pub fn enable_all_sql_log(&mut self) {
428 self.sql_log_options = SqlLogOptions::all();
429 }
430
431 pub fn disable_sql_log(&mut self) {
432 self.sql_log_options = SqlLogOptions::disabled();
433 self.clear_sql_logs();
434 }
435
436 pub fn sql_log_options(&self) -> SqlLogOptions {
437 self.sql_log_options
438 }
439
440 pub fn sql_logs(&self) -> Vec<SqlLogEntry> {
441 self.sql_log_entries
442 .lock()
443 .map(|entries| entries.clone())
444 .unwrap_or_default()
445 }
446
447 pub fn clear_sql_logs(&self) {
448 if let Ok(mut entries) = self.sql_log_entries.lock() {
449 entries.clear();
450 }
451 }
452
453 pub(crate) fn record_sql_log(
454 &self,
455 operation: SqlLogOperation,
456 query: &CompiledQuery,
457 database_kind: DatabaseKind,
458 started_at: SystemTime,
459 ended_at: SystemTime,
460 elapsed: Duration,
461 result_count: Option<usize>,
462 result_type: Option<String>,
463 affected_rows: Option<u64>,
464 trace_chain: Vec<teaql_core::TraceNode>,
465 ) {
466 if !self.sql_log_options.enabled_for(operation) {
467 return;
468 }
469 let debug_sql = query.debug_sql(database_kind);
470 let result_summary = sql_result_summary(
471 operation,
472 result_count,
473 result_type.as_deref(),
474 affected_rows,
475 &debug_sql,
476 );
477
478 let sql_log_entry = SqlLogEntry {
479 operation,
480 sql: query.sql.clone(),
481 params: query.params.clone(),
482 pretty_sql: pretty_sql(&debug_sql),
483 debug_sql: debug_sql.clone(),
484 started_at,
485 ended_at,
486 elapsed,
487 result_summary: result_summary.clone(),
488 result_count,
489 result_type,
490 affected_rows,
491 };
492
493 if let Ok(mut entries) = self.sql_log_entries.lock() {
494 entries.push(sql_log_entry.clone());
498 }
499
500 if let Some(buf) = self.get_resource::<UnifiedLogBuffer>() {
501 if let Ok(mut entries) = buf.entries.lock() {
502 entries.push(UnifiedLogEntry {
503 timestamp: started_at,
504 user_identifier: self.user_identifier.clone(),
505 trace_chain: trace_chain.clone(),
506 payload: LogPayload::Sql(sql_log_entry.clone()),
507 });
508 }
509 }
510
511 crate::log_formatter::LogManager::write_sql_log(&trace_chain, &sql_log_entry);
512 }
513
514 pub(crate) fn record_metadata_log(&self, metadata: &teaql_data_service::ExecutionMetadata) {
515 if let Some(debug_sql) = &metadata.debug_query {
516 let sql_log_entry = SqlLogEntry {
517 operation: match metadata.operation {
518 teaql_data_service::DataServiceOperation::Query => SqlLogOperation::Select,
519 teaql_data_service::DataServiceOperation::Insert => SqlLogOperation::Insert,
520 teaql_data_service::DataServiceOperation::Update => SqlLogOperation::Update,
521 teaql_data_service::DataServiceOperation::Delete => SqlLogOperation::Delete,
522 teaql_data_service::DataServiceOperation::Recover => SqlLogOperation::Update, teaql_data_service::DataServiceOperation::Batch => SqlLogOperation::Update,
524 teaql_data_service::DataServiceOperation::Schema => SqlLogOperation::Update,
525 },
526 sql: String::new(), params: Vec::new(), pretty_sql: pretty_sql(debug_sql),
529 debug_sql: debug_sql.clone(),
530 started_at: metadata.started_at,
531 ended_at: metadata.ended_at,
532 elapsed: metadata.ended_at.duration_since(metadata.started_at).unwrap_or_default(),
533 result_count: metadata.result_count,
534 result_type: None, affected_rows: metadata.affected_rows,
536 result_summary: String::new(), };
538
539 let mut summary = String::new();
541 if let Some(c) = metadata.result_count {
542 summary = format!("{} rows returned", c);
543 } else if let Some(a) = metadata.affected_rows {
544 summary = format!("{} rows affected", a);
545 }
546
547 let mut final_entry = sql_log_entry;
548 final_entry.result_summary = summary;
549
550 if let Ok(mut entries) = self.sql_log_entries.lock() {
551 entries.push(final_entry.clone());
552 }
553
554 if let Some(buf) = self.get_resource::<UnifiedLogBuffer>() {
555 if let Ok(mut entries) = buf.entries.lock() {
556 entries.push(UnifiedLogEntry {
557 timestamp: metadata.started_at,
558 user_identifier: self.user_identifier.clone(),
559 trace_chain: metadata.trace_chain.clone(),
560 payload: LogPayload::Sql(final_entry.clone()),
561 });
562 }
563 }
564
565 crate::log_formatter::LogManager::write_sql_log(&metadata.trace_chain, &final_entry);
566 }
567 }
568
569 pub fn language(&self) -> Language {
570 self.language
571 }
572
573 pub fn set_language_code(&mut self, code: &str) -> Result<(), RuntimeError> {
574 let Some(language) = Language::from_code(code) else {
575 return Err(RuntimeError::Language(format!(
576 "unsupported language code: {code}"
577 )));
578 };
579 self.language = language;
580 Ok(())
581 }
582
583 pub fn generate_id(&self, entity: &str) -> Result<Option<u64>, RuntimeError> {
584 self.internal_id_generator
585 .as_ref()
586 .map(|generator| generator.generate_id(entity))
587 .transpose()
588 }
589
590 pub fn next_id(&self, entity: &str) -> Result<u64, RuntimeError> {
591 match self.generate_id(entity)? {
592 Some(id) => Ok(id),
593 None => local_id_generator().generate_id(entity),
594 }
595 }
596
597 pub fn entity(&self, name: &str) -> Option<&EntityDescriptor> {
598 self.metadata
599 .as_ref()
600 .and_then(|metadata| metadata.entity(name))
601 }
602
603 pub fn all_entities(&self) -> Vec<&EntityDescriptor> {
604 self.metadata
605 .as_ref()
606 .map(|metadata| metadata.all_entities())
607 .unwrap_or_default()
608 }
609
610 pub fn require_entity(&self, name: &str) -> Result<&EntityDescriptor, RuntimeError> {
611 self.entity(name)
612 .ok_or_else(|| RuntimeError::MissingEntity(name.to_owned()))
613 }
614
615 pub fn insert_resource<T>(&mut self, resource: T)
616 where
617 T: Send + Sync + 'static,
618 {
619 self.typed_resources
620 .insert(TypeId::of::<T>(), Box::new(resource));
621 }
622
623 pub fn get_resource<T>(&self) -> Option<&T>
624 where
625 T: Send + Sync + 'static,
626 {
627 self.typed_resources
628 .get(&TypeId::of::<T>())
629 .and_then(|value| value.downcast_ref::<T>())
630 }
631
632 pub fn require_resource<T>(&self) -> Result<&T, ContextError>
633 where
634 T: Send + Sync + 'static,
635 {
636 self.get_resource::<T>()
637 .ok_or(ContextError::MissingTypedResource(
638 std::any::type_name::<T>(),
639 ))
640 }
641
642 pub fn insert_named_resource<T>(&mut self, name: impl Into<String>, resource: T)
643 where
644 T: Send + Sync + 'static,
645 {
646 self.named_resources.insert(name.into(), Box::new(resource));
647 }
648
649 pub fn get_named_resource<T>(&self, name: &str) -> Option<&T>
650 where
651 T: Send + Sync + 'static,
652 {
653 self.named_resources
654 .get(name)
655 .and_then(|value| value.downcast_ref::<T>())
656 }
657
658 pub fn require_named_resource<T>(&self, name: &str) -> Result<&T, ContextError>
659 where
660 T: Send + Sync + 'static,
661 {
662 self.get_named_resource::<T>(name)
663 .ok_or_else(|| ContextError::MissingResource(name.to_owned()))
664 }
665
666 pub fn put_local(&mut self, key: impl Into<String>, value: impl Into<Value>) {
667 self.locals.insert(key.into(), value.into());
668 }
669
670 pub fn local(&self, key: &str) -> Option<&Value> {
671 self.locals.get(key)
672 }
673
674 pub fn remove_local(&mut self, key: &str) -> Option<Value> {
675 self.locals.remove(key)
676 }
677
678 pub fn has_repository(&self, entity: &str) -> bool {
679 let in_registry = self
680 .repository_registry
681 .as_ref()
682 .map(|registry| registry.contains(entity))
683 .unwrap_or(false);
684 in_registry || self.entity(entity).is_some()
685 }
686
687 pub fn repository_behavior(
688 &self,
689 entity: &str,
690 ) -> Option<std::sync::Arc<dyn RepositoryBehavior>> {
691 self.repository_behavior_registry
692 .as_ref()
693 .and_then(|registry| registry.behavior(entity))
694 }
695
696 pub fn has_checker(&self, entity: &str) -> bool {
697 self.checker_registry
698 .as_ref()
699 .and_then(|registry| registry.checker(entity))
700 .is_some()
701 }
702
703 pub fn check_and_fix_record(
704 &self,
705 entity: &str,
706 record: &mut Record,
707 ) -> Result<(), RuntimeError> {
708 self.check_and_fix_record_at(entity, record, &ObjectLocation::root())
709 }
710
711 pub fn check_and_fix_record_at(
712 &self,
713 entity: &str,
714 record: &mut Record,
715 location: &ObjectLocation,
716 ) -> Result<(), RuntimeError> {
717 let Some(checker) = self
718 .checker_registry
719 .as_ref()
720 .and_then(|registry| registry.checker(entity))
721 else {
722 return Ok(());
723 };
724 let mut results = CheckResults::new();
725 checker.check_and_fix(self, record, location, &mut results);
726 if results.is_empty() {
727 Ok(())
728 } else {
729 self.translate_check_results(&mut results);
730 Err(RuntimeError::Check(results))
731 }
732 }
733
734 pub fn translate_check_results(&self, results: &mut CheckResults) {
735 for result in results {
736 result.message = Some(translate_check_result(self.language, result));
737 }
738 }
739
740 pub fn send_event(&self, event: RawAuditEvent) -> Result<(), RuntimeError> {
741 if let Some(sink) = self.event_sink.as_ref() {
742 sink.on_event(self, &event)?;
743 }
744 if let Some(sink) = self.custom_event_sink.as_ref() {
745 let (mask_fields, max_len) = if let Some(metadata) = &self.metadata {
746 if let Some(desc) = metadata.entity(&event.entity) {
747 (desc.audit_mask_fields.clone(), desc.audit_value_max_len)
748 } else {
749 (vec![], None)
750 }
751 } else {
752 (vec![], None)
753 };
754
755 let safe_event = event.build_safe_event(&mask_fields, max_len);
756 sink.on_safe_event(self, &safe_event)?;
757 }
758
759 crate::log_formatter::LogManager::write_audit_log(&event);
760
761 Ok(())
762 }
763
764 pub async fn commit_changes<E>(&self) -> Result<(), RepositoryError<E::Error>>
765 where
766 E: teaql_data_service::MutationExecutor + Send + Sync + 'static,
767 {
768 let executor = self.require_resource::<E>().map_err(|err| {
769 RepositoryError::Runtime(RuntimeError::Graph(format!(
770 "cannot commit changes without executor: {err}"
771 )))
772 })?;
773 let change_set = self.entity_root.current_change_set();
774
775 for (key, changes) in change_set.changes() {
776 if changes.is_empty() {
777 continue;
778 }
779 let _entity = self
780 .require_entity(&key.entity)
781 .map_err(RepositoryError::Runtime)?;
782 let mut command = UpdateCommand::new(&key.entity, key.id.clone());
783 for (field, value) in changes {
784 command = command.value(field.clone(), value.clone());
785 }
786 let request = teaql_data_service::MutationRequest::Update(command);
787 executor
788 .mutate(request).await
789 .map_err(RepositoryError::Executor)?;
790 }
791
792 self.entity_root.clear_current_change_set();
793 Ok(())
794 }
795
796 pub async fn get_in_store(&self, key: &str) -> Option<Value> {
797 if let Some(store) = self.get_resource::<Box<dyn DataStore>>() {
798 store.get(key).await
799 } else {
800 None
801 }
802 }
803
804 pub async fn put_in_store(&self, key: &str, value: impl Into<Value>, timeout_seconds: Option<u64>) {
805 if let Some(store) = self.get_resource::<Box<dyn DataStore>>() {
806 store.put(key, value.into(), timeout_seconds).await;
807 }
808 }
809
810 pub async fn clear_in_store(&self, key: &str) {
811 if let Some(store) = self.get_resource::<Box<dyn DataStore>>() {
812 store.remove(key).await;
813 }
814 }
815}
816
817fn extract_id_from_sql(sql: &str) -> Option<String> {
818 let sql_lower = sql.to_lowercase();
819 let where_idx = sql_lower.find("where")?;
820 let where_clause = &sql_lower[where_idx + 5..];
821
822 let bytes = where_clause.as_bytes();
823 let mut i = 0;
824 while i < bytes.len() {
825 if i + 1 < bytes.len() && &bytes[i..i+2] == b"id" {
826 let prev_ok = if i == 0 {
828 true
829 } else {
830 let prev_char = bytes[i - 1] as char;
831 !prev_char.is_ascii_alphanumeric() && prev_char != '_' && prev_char != '.'
832 };
833 let next_ok = if i + 2 == bytes.len() {
835 true
836 } else {
837 let next_char = bytes[i + 2] as char;
838 !next_char.is_ascii_alphanumeric() && next_char != '_'
839 };
840
841 if prev_ok && next_ok {
842 let mut j = i + 2;
845 while j < bytes.len() && (bytes[j] as char).is_whitespace() {
846 j += 1;
847 }
848 if j < bytes.len() && bytes[j] == b'=' {
849 j += 1;
850 while j < bytes.len() && (bytes[j] as char).is_whitespace() {
851 j += 1;
852 }
853 let mut val_str = String::new();
855 if j < bytes.len() && bytes[j] == b'\'' {
856 j += 1; while j < bytes.len() && bytes[j] != b'\'' {
858 val_str.push(bytes[j] as char);
859 j += 1;
860 }
861 return Some(val_str);
862 } else {
863 while j < bytes.len() {
864 let c = bytes[j] as char;
865 if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
866 val_str.push(c);
867 j += 1;
868 } else {
869 break;
870 }
871 }
872 if !val_str.is_empty() {
873 return Some(val_str);
874 }
875 }
876 }
877 }
878 }
879 i += 1;
880 }
881 None
882}
883
884fn sql_result_summary(
885 operation: SqlLogOperation,
886 result_count: Option<usize>,
887 result_type: Option<&str>,
888 affected_rows: Option<u64>,
889 debug_sql: &str,
890) -> String {
891 match operation {
892 SqlLogOperation::Select => {
893 let count = result_count.unwrap_or(0);
894 if count == 0 {
895 "MISS".to_owned()
896 } else if count > 1 {
897 match result_type {
898 Some(result_type) => format!("{count}*{result_type}"),
899 None => format!("{count}*rows"),
900 }
901 } else {
902 match result_type {
903 Some(result_type) => {
904 if let Some(id) = extract_id_from_sql(debug_sql) {
905 format!("{result_type}({id})")
906 } else {
907 result_type.to_owned()
908 }
909 }
910 None => "row".to_owned(),
911 }
912 }
913 }
914 _ => {
915 let affected = affected_rows.unwrap_or(0);
916 format!("{affected} UPDATED")
917 }
918 }
919}
920
921fn pretty_sql(sql: &str) -> String {
922 let mut pretty = sql.to_owned();
923 for keyword in [
924 " FROM ",
925 " WHERE ",
926 " GROUP BY ",
927 " HAVING ",
928 " ORDER BY ",
929 " LIMIT ",
930 " OFFSET ",
931 " RETURNING ",
932 ] {
933 pretty = pretty.replace(keyword, &format!("\n{}", keyword.trim_start()));
934 }
935 pretty
936 .replace(" AND ", "\n AND ")
937}