Skip to main content

osproxy_engine/
lib.rs

1//! Pipeline orchestration.
2//!
3//! Drives a request through the stages, authenticate, authorize, classify,
4//! resolve, transform, dispatch, reverse-transform, egress (`docs/04` §1),
5//! wiring the other crates together through `osproxy-core` types and
6//! `osproxy-spi` traits. It owns no low-level wire or parsing detail.
7//!
8//! M1 lands the write-path core: [`build_write_batch`] turns a resolved routing
9//! decision plus the request body into the epoch-stamped
10//! [`WriteBatch`](osproxy_sink::WriteBatch) the sink delivers. M2 adds the
11//! get-by-id read path: the [`Pipeline`] maps a client's logical id to the
12//! physical id, fetches it through the [`Reader`](osproxy_sink::Reader) seam,
13//! and strips the injected tenancy fields so the client sees its logical
14//! document, the write→read round-trip symmetry the model rests on.
15#![deny(missing_docs)]
16
17mod admin;
18mod asyncwrite;
19mod bulk;
20mod bulkprep;
21mod cursor;
22mod dbq;
23mod endpoints;
24mod error;
25mod mget;
26mod msearch;
27mod observe;
28mod passthrough;
29mod pipeline;
30mod pit;
31mod plan;
32mod read;
33mod retry;
34mod search_scan;
35mod search_stream;
36
37pub use admin::AdminPolicy;
38pub use asyncwrite::{
39    op_id_for, unsupported_async, valid_op_id, NoQueue, QueueError, QueuedWrite, WriteMode,
40    WriteQueue,
41};
42pub use error::RequestError;
43pub use passthrough::PassthroughPolicy;
44pub use pipeline::{Pipeline, PipelineResponse};
45pub use plan::build_write_batch;
46pub use retry::RetryPolicy;
47pub use search_stream::StreamSearch;
48
49/// Internal entry points exposed **only** for benchmarks (`benches/`), which are
50/// separate crates and so cannot reach `pub(crate)` items. Hidden from docs and
51/// not part of the public API, do not depend on it.
52#[doc(hidden)]
53pub mod bench_support {
54    use osproxy_core::FieldName;
55    use osproxy_spi::{DocIdRule, IdTemplate};
56
57    use crate::read::{shape_hits, ReadShape};
58    use crate::search_scan::{HitShaper, SearchHitsScanner};
59
60    /// The shared-index read shape the reference tenancy produces: strip
61    /// `_tenant`, invert the `{partition}:{body.id}` id rule, drop `_routing`.
62    fn shape() -> ReadShape {
63        ReadShape {
64            inject_names: vec![FieldName::from("_tenant")],
65            id_rule: Some(
66                DocIdRule::new(IdTemplate::new("{partition}:{body.id}")).with_routing(true),
67            ),
68        }
69    }
70
71    /// Builds a realistic `_search` response: `n_hits` shared-index hits (each
72    /// carrying the injected `_tenant`) plus an `aggregations` blob of `agg_bytes`,
73    /// the sibling the proxy forwards verbatim past the hits.
74    #[must_use]
75    pub fn response(n_hits: usize, agg_bytes: usize) -> Vec<u8> {
76        let mut s = String::from(r#"{"took":5,"hits":{"total":{"value":"#);
77        s.push_str(&n_hits.to_string());
78        s.push_str(r#"},"hits":["#);
79        for i in 0..n_hits {
80            if i > 0 {
81                s.push(',');
82            }
83            s.push_str(r#"{"_index":"shared","_id":"acme:"#);
84            s.push_str(&i.to_string());
85            s.push_str(r#"","_routing":"acme","_source":{"_tenant":"acme","msg":"record number "#);
86            s.push_str(&i.to_string());
87            s.push_str(r#""}}"#);
88        }
89        s.push_str(r#"]},"aggregations":{"blob":""#);
90        s.push_str(&"x".repeat(agg_bytes));
91        s.push_str(r#""}}"#);
92        s.into_bytes()
93    }
94
95    /// The buffered transform (`shape_hits`): parse the top level, materialize
96    /// and strip the `hits` subtree, reserialize.
97    #[must_use]
98    pub fn buffered(body: &[u8]) -> Vec<u8> {
99        shape_hits(body, "orders", "acme", &shape()).unwrap_or_default()
100    }
101
102    /// The streaming transform: feed the whole body through the resumable scanner
103    /// in one chunk (the per-byte work the live pipeline does incrementally as
104    /// upstream frames arrive).
105    #[must_use]
106    pub fn streaming(body: &[u8]) -> Vec<u8> {
107        let shaper = HitShaper {
108            logical_index: "orders".to_owned(),
109            partition: "acme".to_owned(),
110            shape: shape(),
111        };
112        let mut scanner = SearchHitsScanner::new(shaper);
113        let mut out = scanner.feed(body);
114        out.extend(scanner.finish());
115        out
116    }
117}