Skip to main content

osproxy_engine/
lib.rs

1//! Pipeline orchestration.
2//!
3//! Drives a request through the stages, authenticate, authorize, classify,
4//! resolve, transform, dispatch, reverse-transform, egress (`docs/04` §1),
5//! wiring the other crates together through `osproxy-core` types and
6//! `osproxy-spi` traits. It owns no low-level wire or parsing detail.
7//!
8//! M1 lands the write-path core: [`build_write_batch`] turns a resolved routing
9//! decision plus the request body into the epoch-stamped
10//! [`WriteBatch`](osproxy_sink::WriteBatch) the sink delivers. M2 adds the
11//! get-by-id read path: the [`Pipeline`] maps a client's logical id to the
12//! physical id, fetches it through the [`Reader`](osproxy_sink::Reader) seam,
13//! and strips the injected tenancy fields so the client sees its logical
14//! document, the write→read round-trip symmetry the model rests on.
15#![deny(missing_docs)]
16
17mod admin;
18mod asyncwrite;
19mod bulk;
20mod bulkline;
21mod bulkprep;
22mod cursor;
23mod dbq;
24mod endpoints;
25mod error;
26mod mget;
27mod msearch;
28mod observe;
29mod passthrough;
30mod pipeline;
31mod pit;
32mod plan;
33mod read;
34mod retry;
35mod search_scan;
36mod search_stream;
37
38pub use admin::AdminPolicy;
39pub use asyncwrite::{
40    op_id_for, unsupported_async, valid_op_id, NoQueue, QueueError, QueuedWrite, WriteMode,
41    WriteQueue,
42};
43pub use error::RequestError;
44pub use passthrough::PassthroughPolicy;
45pub use pipeline::{Pipeline, PipelineResponse};
46pub use plan::build_write_batch;
47pub use retry::RetryPolicy;
48pub use search_stream::StreamSearch;
49
50/// Internal entry points exposed **only** for benchmarks (`benches/`), which are
51/// separate crates and so cannot reach `pub(crate)` items. Hidden from docs and
52/// not part of the public API, do not depend on it.
53#[doc(hidden)]
54pub mod bench_support {
55    use osproxy_core::FieldName;
56    use osproxy_spi::{DocIdRule, IdTemplate};
57
58    use crate::read::{shape_hits, ReadShape};
59    use crate::search_scan::{HitShaper, SearchHitsScanner};
60
61    /// The shared-index read shape the reference tenancy produces: strip
62    /// `_tenant`, invert the `{partition}:{body.id}` id rule, drop `_routing`.
63    fn shape() -> ReadShape {
64        ReadShape {
65            inject_names: vec![FieldName::from("_tenant")],
66            id_rule: Some(
67                DocIdRule::new(IdTemplate::new("{partition}:{body.id}")).with_routing(true),
68            ),
69        }
70    }
71
72    /// Builds a realistic `_search` response: `n_hits` shared-index hits (each
73    /// carrying the injected `_tenant`) plus an `aggregations` blob of `agg_bytes`,
74    /// the sibling the proxy forwards verbatim past the hits.
75    #[must_use]
76    pub fn response(n_hits: usize, agg_bytes: usize) -> Vec<u8> {
77        let mut s = String::from(r#"{"took":5,"hits":{"total":{"value":"#);
78        s.push_str(&n_hits.to_string());
79        s.push_str(r#"},"hits":["#);
80        for i in 0..n_hits {
81            if i > 0 {
82                s.push(',');
83            }
84            s.push_str(r#"{"_index":"shared","_id":"acme:"#);
85            s.push_str(&i.to_string());
86            s.push_str(r#"","_routing":"acme","_source":{"_tenant":"acme","msg":"record number "#);
87            s.push_str(&i.to_string());
88            s.push_str(r#""}}"#);
89        }
90        s.push_str(r#"]},"aggregations":{"blob":""#);
91        s.push_str(&"x".repeat(agg_bytes));
92        s.push_str(r#""}}"#);
93        s.into_bytes()
94    }
95
96    /// The buffered transform (`shape_hits`): parse the top level, materialize
97    /// and strip the `hits` subtree, reserialize.
98    #[must_use]
99    pub fn buffered(body: &[u8]) -> Vec<u8> {
100        shape_hits(body, "orders", "acme", &shape()).unwrap_or_default()
101    }
102
103    /// The streaming transform: feed the whole body through the resumable scanner
104    /// in one chunk (the per-byte work the live pipeline does incrementally as
105    /// upstream frames arrive).
106    #[must_use]
107    pub fn streaming(body: &[u8]) -> Vec<u8> {
108        let shaper = HitShaper {
109            logical_index: "orders".to_owned(),
110            partition: "acme".to_owned(),
111            shape: shape(),
112        };
113        let mut scanner = SearchHitsScanner::new(shaper);
114        let mut out = scanner.feed(body);
115        out.extend(scanner.finish());
116        out
117    }
118}