Skip to main content

innate_core/kb/
recall.rs

1use super::*;
2
3/// Parameters for [`KnowledgeBase::recall`].
4///
5/// Borrowed, `Default`-able: construct with `RecallParams { query, budget, source, ..Default::default() }`.
6/// Empty-string defaults are normalized inside `recall`: `expand_deps` empty → `"false"`,
7/// `refine_mode` empty → `"off"`.
8#[derive(Debug, Clone, Default)]
9pub struct RecallParams<'a> {
10    pub query: &'a str,
11    pub budget: usize,
12    pub trace: bool,
13    pub include_sparks: bool,
14    pub top: Option<usize>,
15    pub source: &'a str,
16    pub expand_deps: &'a str, // "false" | "direct" | "closure"
17    pub allow_trim: bool,     // if true, invoke Refiner::trim when block doesn't fit
18    pub refine_mode: &'a str, // "off" | "trim" | "adapt" — recorded in trace
19    /// Relevance gate: drop candidates whose fused score is below this value
20    /// **before** packing/trace, so the trace only records knowledge that was
21    /// actually surfaced. `None` disables the gate. Used by always-on hooks
22    /// (UserPromptSubmit / SessionStart) to stay high-frequency without noise.
23    pub min_score: Option<f64>,
24}
25
26impl KnowledgeBase {
27    pub fn recall(&self, params: RecallParams<'_>) -> Result<RecallResult> {
28        let RecallParams {
29            query,
30            budget,
31            trace,
32            include_sparks,
33            top,
34            source,
35            expand_deps,
36            allow_trim,
37            refine_mode,
38            min_score,
39        } = params;
40        let expand_deps = if expand_deps.is_empty() {
41            "false"
42        } else {
43            expand_deps
44        };
45        let refine_mode = if refine_mode.is_empty() {
46            "off"
47        } else {
48            refine_mode
49        };
50        validate_source(source)?;
51        let trace_id = gen_uuid();
52        let now = utc_now_iso();
53
54        // Calibration path: derive the context_key from a Situation. A bare query degrades
55        // exactly to the legacy `content_hash(normalize_query(query))`, so recall stays
56        // zero-regression while sharing one key derivation with appraise (Spec §2.2).
57        let situation = Situation::from_query(query);
58        let context_key = situation.context_key(&self.situation_coarse_keys);
59
60        let (q_content, q_trigger) = self
61            .embedding
62            .embed_both(query)
63            .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
64
65        // ANN candidates (non-spark)
66        let mut candidates = self.ann_candidates(&q_content, &q_trigger)?;
67        self.apply_soft_dep_bonus(&mut candidates)?;
68
69        // Score + anti-trigger penalty
70        let mut scored = self.score_candidates(candidates, query, &context_key)?;
71
72        // Relevance gate — drop sub-threshold candidates before packing/trace so the
73        // trace records only what was actually surfaced (keeps selected→used stats clean).
74        if let Some(min) = min_score {
75            scored.retain(|(fused, _)| *fused >= min);
76        }
77
78        // First-fit pack with dep expansion
79        let (selected, skipped, skipped_reasons) =
80            self.pack(&scored, budget, expand_deps, allow_trim, query)?;
81
82        let depth_skipped: Vec<String> = skipped_reasons
83            .iter()
84            .filter(|(_, r)| r.as_str() == "dep_depth_limit")
85            .map(|(id, _)| id.clone())
86            .collect();
87
88        // Density refill
89        let mut selected = selected;
90        if self.density_refill {
91            selected = self.density_refill(selected, &skipped, budget);
92        }
93
94        let limited = limit_knowledge(selected, top);
95        let visible = if refine_mode == "adapt" {
96            self.refiner
97                .refine(limited.clone(), Some(budget))
98                .unwrap_or(limited)
99        } else {
100            limited
101        };
102
103        // Sparks
104        let sparks = if include_sparks {
105            self.recall_sparks(&q_content, &q_trigger)?
106        } else {
107            vec![]
108        };
109
110        if trace {
111            self.write_recall_trace(
112                &trace_id,
113                query,
114                &context_key,
115                &scored,
116                &visible,
117                &sparks,
118                &depth_skipped,
119                &skipped_reasons,
120                refine_mode,
121                source,
122                &now,
123            )?;
124        }
125
126        let empty = visible.is_empty() && sparks.is_empty();
127        Ok(RecallResult {
128            knowledge: visible,
129            sparks,
130            trace_id,
131            empty,
132            depth_skipped,
133            skipped_reasons,
134        })
135    }
136
137    pub(super) fn ann_candidates(
138        &self,
139        q_content: &[f32],
140        q_trigger: &[f32],
141    ) -> Result<HashMap<String, CandidateInfo>> {
142        let embed_version = self
143            .storage
144            .get_meta("embed_version")?
145            .and_then(|v| v.parse::<i64>().ok())
146            .unwrap_or(1);
147
148        let content_res = self
149            .storage
150            .search_vec_content(q_content, self.top_k_candidates * 2)?;
151        let trigger_res = self
152            .storage
153            .search_vec_trigger(q_trigger, self.top_k_candidates * 2)?;
154
155        // Collect unique ids and batch-fetch all chunks in two queries instead of N individual ones.
156        let all_ids: Vec<&str> = {
157            let mut seen = HashSet::new();
158            content_res
159                .iter()
160                .chain(trigger_res.iter())
161                .map(|(id, _)| id.as_str())
162                .filter(|id| seen.insert(*id))
163                .collect()
164        };
165        let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
166
167        let mut candidates: HashMap<String, CandidateInfo> = HashMap::new();
168
169        for (cid, sim) in &content_res {
170            if let Some(chunk) = chunks.get(cid) {
171                if chunk_is_valid_for_recall(chunk, embed_version) {
172                    let e = candidates
173                        .entry(cid.clone())
174                        .or_insert_with(|| CandidateInfo {
175                            chunk: chunk.clone(),
176                            sim_content: 0.0,
177                            sim_trigger: 0.0,
178                        });
179                    e.sim_content = e.sim_content.max(*sim);
180                }
181            }
182        }
183        for (cid, sim) in &trigger_res {
184            if let Some(chunk) = chunks.get(cid) {
185                if chunk_is_valid_for_recall(chunk, embed_version) {
186                    let e = candidates
187                        .entry(cid.clone())
188                        .or_insert_with(|| CandidateInfo {
189                            chunk: chunk.clone(),
190                            sim_content: 0.0,
191                            sim_trigger: 0.0,
192                        });
193                    e.sim_trigger = e.sim_trigger.max(*sim);
194                }
195            }
196        }
197        Ok(candidates)
198    }
199
200    pub(super) fn apply_soft_dep_bonus(
201        &self,
202        candidates: &mut HashMap<String, CandidateInfo>,
203    ) -> Result<()> {
204        // Collect non-spark candidate ids and batch-fetch their outgoing deps
205        // in a single query (was one get_deps per candidate).
206        let src_ids: Vec<String> = candidates
207            .iter()
208            .filter(|(_, info)| {
209                info.chunk.get("origin").and_then(Value::as_str) != Some("spark")
210            })
211            .map(|(cid, _)| cid.clone())
212            .collect();
213        if src_ids.is_empty() {
214            return Ok(());
215        }
216        let src_refs: Vec<&str> = src_ids.iter().map(String::as_str).collect();
217        let deps_map = self.storage.get_deps_batch(&src_refs)?;
218
219        // Gather distinct soft-dep targets and batch-fetch them in one query
220        // (was one get_chunk per soft edge).
221        let mut target_ids: Vec<String> = Vec::new();
222        let mut seen: HashSet<String> = HashSet::new();
223        for deps in deps_map.values() {
224            for (dst, kind, _) in deps {
225                if kind == "soft" && seen.insert(dst.clone()) {
226                    target_ids.push(dst.clone());
227                }
228            }
229        }
230        if target_ids.is_empty() {
231            return Ok(());
232        }
233        let target_refs: Vec<&str> = target_ids.iter().map(String::as_str).collect();
234        let targets = self.storage.get_chunks_by_ids(&target_refs)?;
235
236        for src in &src_ids {
237            let Some(deps) = deps_map.get(src) else {
238                continue;
239            };
240            for (dst, kind, _) in deps {
241                if kind != "soft" {
242                    continue;
243                }
244                let Some(target) = targets.get(dst) else {
245                    continue;
246                };
247                if target.get("state").and_then(Value::as_str) == Some("archived") {
248                    continue;
249                }
250                if target.get("origin").and_then(Value::as_str) == Some("spark") {
251                    continue;
252                }
253                let e = candidates
254                    .entry(dst.clone())
255                    .or_insert_with(|| CandidateInfo {
256                        chunk: target.clone(),
257                        sim_content: 0.0,
258                        sim_trigger: 0.0,
259                    });
260                e.sim_content = (e.sim_content + 0.05).min(1.0);
261            }
262        }
263        Ok(())
264    }
265
266    fn score_candidates(
267        &self,
268        candidates: HashMap<String, CandidateInfo>,
269        query: &str,
270        context_key: &str,
271    ) -> Result<Vec<(f64, Value)>> {
272        // Batch-fetch context scores for all candidates in one query
273        // (was one context_score lookup per candidate).
274        let cand_ids: Vec<String> = candidates
275            .values()
276            .filter_map(|info| info.chunk.get("id").and_then(Value::as_str).map(str::to_string))
277            .collect();
278        let cand_refs: Vec<&str> = cand_ids.iter().map(String::as_str).collect();
279        let ctx_scores = self.storage.context_scores_batch(&cand_refs, context_key)?;
280
281        let mut scored: Vec<(f64, Value)> = Vec::with_capacity(candidates.len());
282        for info in candidates.into_values() {
283            let conf = info
284                .chunk
285                .get("confidence")
286                .and_then(Value::as_f64)
287                .unwrap_or(0.5);
288            let chunk_id = info.chunk.get("id").and_then(Value::as_str).unwrap_or("");
289            let context_score = ctx_scores.get(chunk_id).copied().unwrap_or(0.0);
290            let mut fused = self.w_content * info.sim_content as f64
291                + self.w_trigger * info.sim_trigger as f64
292                + self.w_confidence * conf
293                + self.w_context * context_score;
294            if info.chunk.get("state").and_then(Value::as_str) == Some("pending") {
295                fused *= PENDING_RECALL_PENALTY;
296            }
297            let anti = info
298                .chunk
299                .get("anti_trigger_desc")
300                .and_then(Value::as_str)
301                .unwrap_or("");
302            if !anti.is_empty() && anti_trigger_hit(query, anti) {
303                fused *= self.anti_trigger_penalty;
304            }
305            let mut chunk = info.chunk;
306            chunk["_context_score"] = json!(context_score);
307            chunk["_fused_score"] = json!(fused);
308            scored.push((fused, chunk));
309        }
310        scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
311        scored.truncate(self.top_k_candidates);
312        Ok(scored)
313    }
314
315    fn pack(
316        &self,
317        scored: &[(f64, Value)],
318        budget: usize,
319        expand_deps: &str,
320        allow_trim: bool,
321        query: &str,
322    ) -> Result<PackResult> {
323        let mut selected: Vec<Value> = vec![];
324        let mut skipped: Vec<(Vec<Value>, f64, usize)> = vec![];
325        let mut skipped_reasons: HashMap<String, String> = HashMap::new();
326        let mut used_ids: HashSet<String> = HashSet::new();
327        let mut used_tokens: usize = 0;
328
329        for (fused, chunk) in scored {
330            let cid = chunk["id"].as_str().unwrap_or("").to_string();
331            if used_ids.contains(&cid) {
332                continue;
333            }
334
335            // Build block with dep expansion; fail-closed on dep issues.
336            let (block, dep_skip_reason) = self.build_dep_block(chunk, expand_deps)?;
337            if let Some(reason) = dep_skip_reason {
338                skipped_reasons.insert(cid, reason);
339                continue;
340            }
341
342            let new_block: Vec<Value> = block
343                .iter()
344                .filter(|b| !used_ids.contains(b["id"].as_str().unwrap_or("")))
345                .cloned()
346                .collect();
347            let cost = block_cost(&new_block);
348
349            if used_tokens + cost <= budget {
350                for b in &block {
351                    let bid = b["id"].as_str().unwrap_or("").to_string();
352                    if !used_ids.contains(&bid) {
353                        let mut b = b.clone();
354                        b["_fused_score"] = json!(fused);
355                        selected.push(b);
356                        used_ids.insert(bid);
357                    }
358                }
359                used_tokens += cost;
360            } else if allow_trim {
361                // Attempt refiner trim — NullRefiner returns None (no-op).
362                if let Some(trimmed) =
363                    self.refiner
364                        .trim(&block, query, budget.saturating_sub(used_tokens))
365                {
366                    let trim_cost = block_cost(&trimmed);
367                    if used_tokens + trim_cost <= budget {
368                        for b in &trimmed {
369                            let bid = b["id"].as_str().unwrap_or("").to_string();
370                            if !used_ids.contains(&bid) {
371                                let mut b = b.clone();
372                                b["_fused_score"] = json!(fused);
373                                b["_trimmed"] = json!(true);
374                                selected.push(b);
375                                used_ids.insert(bid);
376                            }
377                        }
378                        used_tokens += trim_cost;
379                        continue;
380                    }
381                }
382                skipped.push((block, *fused, cost));
383            } else {
384                skipped.push((block, *fused, cost));
385            }
386        }
387        Ok((selected, skipped, skipped_reasons))
388    }
389
390    /// Expand a seed chunk into a block according to `expand_deps`.
391    /// Returns `(block, Some(skip_reason))` if the block should be discarded (fail-closed).
392    fn build_dep_block(
393        &self,
394        seed: &Value,
395        expand_deps: &str,
396    ) -> Result<(Vec<Value>, Option<String>)> {
397        if expand_deps == "false" || expand_deps.is_empty() {
398            return Ok((vec![seed.clone()], None));
399        }
400        let seed_id = seed["id"].as_str().unwrap_or("");
401        match expand_deps {
402            "direct" => {
403                let deps = self.storage.get_deps(seed_id)?;
404                let mut block = vec![seed.clone()];
405                for (dep_id, kind, _) in &deps {
406                    if kind != "hard" {
407                        continue;
408                    }
409                    match self.validate_hard_dep(dep_id)? {
410                        Some(chunk) => block.push(chunk),
411                        None => return Ok((vec![], Some("hard_dep_unavailable".to_string()))),
412                    }
413                }
414                Ok((block, None))
415            }
416            "closure" => {
417                let mut block = vec![seed.clone()];
418                let mut visited: HashSet<String> = [seed_id.to_string()].into();
419                match self.expand_hard_closure(seed_id, &mut visited, &mut block, 0, 3)? {
420                    Some(reason) => Ok((vec![], Some(reason))),
421                    None => Ok((block, None)),
422                }
423            }
424            _ => Ok((vec![seed.clone()], None)),
425        }
426    }
427
428    /// Returns the chunk if the hard dep is usable, None if it should cause fail-closed.
429    fn validate_hard_dep(&self, dep_id: &str) -> Result<Option<Value>> {
430        match self.storage.get_chunk(dep_id)? {
431            None => Ok(None),
432            Some(chunk) => {
433                let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
434                let origin = chunk.get("origin").and_then(Value::as_str).unwrap_or("");
435                let embed_v = chunk
436                    .get("embed_version")
437                    .and_then(Value::as_i64)
438                    .unwrap_or(0);
439                if state == "archived" || origin == "spark" || embed_v == 0 {
440                    Ok(None)
441                } else {
442                    Ok(Some(chunk))
443                }
444            }
445        }
446    }
447
448    /// BFS hard-dep expansion up to `max_depth`. Returns Some(reason) on fail-closed.
449    fn expand_hard_closure(
450        &self,
451        id: &str,
452        visited: &mut HashSet<String>,
453        block: &mut Vec<Value>,
454        depth: usize,
455        max_depth: usize,
456    ) -> Result<Option<String>> {
457        if depth >= max_depth {
458            return Ok(Some("dep_depth_limit".to_string()));
459        }
460        let deps = self.storage.get_deps(id)?;
461        for (dep_id, kind, _) in &deps {
462            if kind != "hard" {
463                continue;
464            }
465            if visited.contains(dep_id) {
466                continue;
467            } // cycle guard
468            visited.insert(dep_id.clone());
469            match self.validate_hard_dep(dep_id)? {
470                None => return Ok(Some("hard_dep_unavailable".to_string())),
471                Some(chunk) => {
472                    block.push(chunk);
473                    if let Some(reason) =
474                        self.expand_hard_closure(dep_id, visited, block, depth + 1, max_depth)?
475                    {
476                        return Ok(Some(reason));
477                    }
478                }
479            }
480        }
481        Ok(None)
482    }
483
484    fn density_refill(
485        &self,
486        mut selected: Vec<Value>,
487        skipped: &[(Vec<Value>, f64, usize)],
488        budget: usize,
489    ) -> Vec<Value> {
490        let used_tokens = block_cost(&selected);
491        if used_tokens >= budget {
492            return selected;
493        }
494
495        let selected_ids: HashSet<String> = selected
496            .iter()
497            .filter_map(|c| c["id"].as_str().map(str::to_string))
498            .collect();
499
500        let mut density_items: Vec<(f64, Vec<Value>, usize)> = skipped
501            .iter()
502            .filter_map(|(block, fscore, _)| {
503                let block: Vec<Value> = block
504                    .iter()
505                    .filter(|b| !selected_ids.contains(b["id"].as_str().unwrap_or("")))
506                    .cloned()
507                    .collect();
508                if block.is_empty() {
509                    return None;
510                }
511                let cost = block_cost(&block);
512                let density = fscore / cost.max(1) as f64;
513                Some((density, block, cost))
514            })
515            .collect();
516        density_items.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
517
518        let mut used_tokens = block_cost(&selected);
519        let mut added_ids: HashSet<String> = selected_ids;
520        for (_, block, cost) in density_items {
521            if used_tokens + cost <= budget {
522                for b in block {
523                    let bid = b["id"].as_str().unwrap_or("").to_string();
524                    if !added_ids.contains(&bid) {
525                        selected.push(b);
526                        added_ids.insert(bid);
527                    }
528                }
529                used_tokens += cost;
530            }
531        }
532        selected
533    }
534
535    fn recall_sparks(&self, q_content: &[f32], q_trigger: &[f32]) -> Result<Vec<Value>> {
536        let embed_version = self
537            .storage
538            .get_meta("embed_version")?
539            .and_then(|v| v.parse::<i64>().ok())
540            .unwrap_or(1);
541
542        let content_res = self
543            .storage
544            .search_vec_content(q_content, self.top_k_candidates)?;
545        let trigger_res = self
546            .storage
547            .search_vec_trigger(q_trigger, self.top_k_candidates)?;
548
549        // Batch-fetch all candidate chunk IDs (mirrors the pattern in ann_candidates).
550        let all_ids: Vec<&str> = {
551            let mut seen = HashSet::new();
552            content_res
553                .iter()
554                .chain(trigger_res.iter())
555                .map(|(id, _)| id.as_str())
556                .filter(|id| seen.insert(*id))
557                .collect()
558        };
559        let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
560
561        let mut spark_scores: HashMap<String, (f32, Value)> = HashMap::new();
562        for (cid, sim) in content_res.iter().chain(trigger_res.iter()) {
563            if let Some(chunk) = chunks.get(cid) {
564                if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
565                    continue;
566                }
567                if chunk.get("state").and_then(Value::as_str) == Some("archived") {
568                    continue;
569                }
570                let maturity = chunk.get("maturity").and_then(Value::as_str).unwrap_or("");
571                if maturity == "promoted" || maturity == "dropped" {
572                    continue;
573                }
574                let ev = chunk
575                    .get("embed_version")
576                    .and_then(Value::as_i64)
577                    .unwrap_or(1);
578                if ev < embed_version {
579                    continue;
580                }
581                let entry = spark_scores
582                    .entry(cid.clone())
583                    .or_insert_with(|| (*sim, chunk.clone()));
584                if *sim > entry.0 {
585                    *entry = (*sim, chunk.clone());
586                }
587            }
588        }
589        let mut sparks: Vec<(f32, Value)> = spark_scores.into_values().collect();
590        sparks.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
591        Ok(sparks
592            .into_iter()
593            .take(self.top_k_candidates)
594            .map(|(_, c)| c)
595            .collect())
596    }
597
598    #[allow(clippy::too_many_arguments)]
599    fn write_recall_trace(
600        &self,
601        trace_id: &str,
602        query: &str,
603        context_key: &str,
604        scored: &[(f64, Value)],
605        visible: &[Value],
606        sparks: &[Value],
607        depth_skipped: &[String],
608        skipped_reasons: &HashMap<String, String>,
609        refine_mode: &str,
610        source: &str,
611        now: &str,
612    ) -> Result<()> {
613        let lib_id = self.storage.lib_id()?;
614        self.storage.begin_immediate()?;
615        let result = (|| -> Result<()> {
616            for (rank, (_, chunk)) in scored.iter().enumerate() {
617                let cid = chunk["id"].as_str().unwrap_or("");
618                let sim = chunk.get("_fused_score").and_then(Value::as_f64);
619                // For dep-skipped seeds, record their skip reason as refine_mode.
620                let rm = skipped_reasons
621                    .get(cid)
622                    .map(|r| format!("skipped:{r}"))
623                    .or_else(|| {
624                        if refine_mode != "off" && !refine_mode.is_empty() {
625                            Some(refine_mode.to_string())
626                        } else {
627                            None
628                        }
629                    });
630                self.storage.insert_usage_trace(
631                    trace_id,
632                    Some(cid),
633                    "retrieved",
634                    1.0,
635                    sim,
636                    rm.as_deref(),
637                    None,
638                    Some((rank + 1) as i64),
639                    None,
640                    source,
641                    now,
642                )?;
643            }
644            for (rank, chunk) in visible.iter().enumerate() {
645                let cid = chunk["id"].as_str().unwrap_or("");
646                self.storage.insert_usage_trace(
647                    trace_id,
648                    Some(cid),
649                    "selected",
650                    1.0,
651                    None,
652                    None,
653                    None,
654                    Some((rank + 1) as i64),
655                    None,
656                    source,
657                    now,
658                )?;
659                // Write 'refined' event for chunks that came through the trim path.
660                if chunk
661                    .get("_trimmed")
662                    .and_then(Value::as_bool)
663                    .unwrap_or(false)
664                {
665                    self.storage.insert_usage_trace(
666                        trace_id,
667                        Some(cid),
668                        "refined",
669                        1.0,
670                        None,
671                        Some("trim"),
672                        None,
673                        Some((rank + 1) as i64),
674                        None,
675                        source,
676                        now,
677                    )?;
678                }
679            }
680            // Write 'retrieved' events for sparks (for recurring-spark count tracking).
681            for (rank, chunk) in sparks.iter().enumerate() {
682                let cid = chunk["id"].as_str().unwrap_or("");
683                self.storage.insert_usage_trace(
684                    trace_id,
685                    Some(cid),
686                    "retrieved",
687                    1.0,
688                    None,
689                    Some("spark"),
690                    None,
691                    Some((rank + 1) as i64),
692                    None,
693                    source,
694                    now,
695                )?;
696            }
697            let snapshot = json!({
698                "retrieved": scored.iter().map(|(_, c)| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
699                "selected": visible.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
700                "sparks": sparks.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
701                "depth_skipped": depth_skipped,
702                "skipped_reasons": skipped_reasons,
703            });
704            let log = EpisodicLogRow {
705                id: gen_uuid(),
706                trace_id: trace_id.to_string(),
707                lib_id,
708                ts: now.to_string(),
709                query: Some(query.to_string()),
710                recall_snapshot: Some(snapshot.to_string()),
711                event_source: source.to_string(),
712                task_state: "recalled".to_string(),
713                usage_state: "unknown".to_string(),
714                context_key: Some(context_key.to_string()),
715                distill_state: "open".to_string(),
716                ..Default::default()
717            };
718            self.storage.upsert_episodic_log(&log)?;
719            self.storage.commit()
720        })();
721        if result.is_err() {
722            let _ = self.storage.rollback();
723        }
724        result
725    }
726}