Skip to main content

linguasteg_core/
orchestration.rs

1use crate::{
2    CoreResult, DecodeRequest, FixedWidthPlanningOptions, GatewayMessage, GatewayMessageRole,
3    GatewayOperation, GatewayRequest, GatewayResponse, LanguageRegistry, ModelGatewayRegistry,
4    ModelRegistry, StrategyRegistry, SymbolicFramePlan, SymbolicFrameSchema, SymbolicPayloadPlan,
5    SymbolicPayloadPlanner, ValidatedDecodeRequest, ValidatedEncodeRequest,
6    decode_payload_from_symbolic_frames, validate_decode_request, validate_encode_request,
7};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct OrchestratedEncodeResult {
11    pub validated: ValidatedEncodeRequest,
12    pub symbolic_plan: SymbolicPayloadPlan,
13    pub gateway_response: Option<GatewayResponse>,
14}
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct OrchestratedDecodeResult {
18    pub validated: ValidatedDecodeRequest,
19    pub payload: Vec<u8>,
20    pub gateway_response: Option<GatewayResponse>,
21}
22
23pub struct PipelineOrchestrator<'a> {
24    language_registry: &'a dyn LanguageRegistry,
25    strategy_registry: &'a dyn StrategyRegistry,
26    model_registry: &'a dyn ModelRegistry,
27    gateway_registry: &'a dyn ModelGatewayRegistry,
28    symbolic_planner: &'a dyn SymbolicPayloadPlanner,
29    symbolic_options: FixedWidthPlanningOptions,
30}
31
32impl<'a> PipelineOrchestrator<'a> {
33    pub fn new(
34        language_registry: &'a dyn LanguageRegistry,
35        strategy_registry: &'a dyn StrategyRegistry,
36        model_registry: &'a dyn ModelRegistry,
37        gateway_registry: &'a dyn ModelGatewayRegistry,
38        symbolic_planner: &'a dyn SymbolicPayloadPlanner,
39    ) -> Self {
40        Self {
41            language_registry,
42            strategy_registry,
43            model_registry,
44            gateway_registry,
45            symbolic_planner,
46            symbolic_options: FixedWidthPlanningOptions::default(),
47        }
48    }
49
50    pub fn with_symbolic_options(mut self, options: FixedWidthPlanningOptions) -> Self {
51        self.symbolic_options = options;
52        self
53    }
54
55    pub fn orchestrate_encode(
56        &self,
57        request: crate::EncodeRequest,
58        schemas: &[SymbolicFrameSchema],
59    ) -> CoreResult<OrchestratedEncodeResult> {
60        let validated = validate_encode_request(
61            &request,
62            self.language_registry,
63            self.strategy_registry,
64            self.model_registry,
65        )?;
66        let symbolic_plan = self
67            .symbolic_planner
68            .plan_payload(&request.payload, schemas)?;
69        let gateway_response = self
70            .build_encode_gateway_request(&validated, &request)
71            .map(|gateway_request| {
72                let gateway = self
73                    .gateway_registry
74                    .route(&gateway_request.provider, &gateway_request.model)?;
75                gateway.complete(gateway_request)
76            })
77            .transpose()?;
78
79        Ok(OrchestratedEncodeResult {
80            validated,
81            symbolic_plan,
82            gateway_response,
83        })
84    }
85
86    pub fn orchestrate_decode(
87        &self,
88        request: DecodeRequest,
89        frames: &[SymbolicFramePlan],
90        schemas: &[SymbolicFrameSchema],
91    ) -> CoreResult<OrchestratedDecodeResult> {
92        let validated = validate_decode_request(
93            &request,
94            self.language_registry,
95            self.strategy_registry,
96            self.model_registry,
97        )?;
98        let payload = decode_payload_from_symbolic_frames(frames, schemas, &self.symbolic_options)?;
99        let gateway_response = self
100            .build_decode_gateway_request(&validated, &request)
101            .map(|gateway_request| {
102                let gateway = self
103                    .gateway_registry
104                    .route(&gateway_request.provider, &gateway_request.model)?;
105                gateway.complete(gateway_request)
106            })
107            .transpose()?;
108
109        Ok(OrchestratedDecodeResult {
110            validated,
111            payload,
112            gateway_response,
113        })
114    }
115
116    fn build_encode_gateway_request(
117        &self,
118        validated: &ValidatedEncodeRequest,
119        request: &crate::EncodeRequest,
120    ) -> Option<GatewayRequest> {
121        validated.model.as_ref().map(|model| GatewayRequest {
122            provider: model.provider.clone(),
123            model: model.model.clone(),
124            language: validated.language.tag.clone(),
125            strategy: validated.strategy.id.clone(),
126            operation: GatewayOperation::Encode,
127            messages: vec![GatewayMessage {
128                role: GatewayMessageRole::User,
129                content: request.carrier_text.clone(),
130            }],
131            seed: None,
132            max_tokens: None,
133        })
134    }
135
136    fn build_decode_gateway_request(
137        &self,
138        validated: &ValidatedDecodeRequest,
139        request: &DecodeRequest,
140    ) -> Option<GatewayRequest> {
141        validated.model.as_ref().map(|model| GatewayRequest {
142            provider: model.provider.clone(),
143            model: model.model.clone(),
144            language: validated.language.tag.clone(),
145            strategy: validated.strategy.id.clone(),
146            operation: GatewayOperation::Decode,
147            messages: vec![GatewayMessage {
148                role: GatewayMessageRole::User,
149                content: request.stego_text.clone(),
150            }],
151            seed: None,
152            max_tokens: None,
153        })
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use crate::{
160        DecodeRequest, EncodeRequest, FixedWidthBitPlanner, GatewayFinishReason, GatewayOperation,
161        GatewayRequest, GatewayResponse, LanguageDescriptor, LanguageRegistry, LanguageTag,
162        ModelCapability, ModelDescriptor, ModelGateway, ModelGatewayRegistry, ModelId,
163        ModelRegistry, ModelSelection, PipelineOptions, ProviderId, StrategyDescriptor, StrategyId,
164        StrategyRegistry, SymbolicFieldSpec, SymbolicFrameSchema, TextDirection,
165    };
166
167    use super::PipelineOrchestrator;
168    use crate::SymbolicPayloadPlanner;
169
170    struct TestLanguageRegistry {
171        values: Vec<LanguageDescriptor>,
172    }
173
174    impl LanguageRegistry for TestLanguageRegistry {
175        fn all_languages(&self) -> &[LanguageDescriptor] {
176            &self.values
177        }
178    }
179
180    struct TestStrategyRegistry {
181        values: Vec<StrategyDescriptor>,
182    }
183
184    impl StrategyRegistry for TestStrategyRegistry {
185        fn all_strategies(&self) -> &[StrategyDescriptor] {
186            &self.values
187        }
188    }
189
190    struct TestModelRegistry {
191        values: Vec<ModelDescriptor>,
192    }
193
194    impl ModelRegistry for TestModelRegistry {
195        fn all_models(&self) -> &[ModelDescriptor] {
196            &self.values
197        }
198    }
199
200    struct TestGateway {
201        provider: ProviderId,
202    }
203
204    impl ModelGateway for TestGateway {
205        fn provider(&self) -> &ProviderId {
206            &self.provider
207        }
208
209        fn complete(&self, request: GatewayRequest) -> crate::CoreResult<GatewayResponse> {
210            let content = format!(
211                "{}:{}:{}",
212                operation_name(request.operation),
213                request.language,
214                request.messages.len()
215            );
216            Ok(GatewayResponse {
217                content,
218                finish_reason: GatewayFinishReason::Stop,
219                usage: None,
220            })
221        }
222    }
223
224    struct TestGatewayRegistry {
225        gateways: Vec<TestGateway>,
226    }
227
228    impl ModelGatewayRegistry for TestGatewayRegistry {
229        fn gateway(&self, provider: &ProviderId) -> Option<&dyn ModelGateway> {
230            self.gateways
231                .iter()
232                .find(|gateway| gateway.provider == *provider)
233                .map(|gateway| gateway as &dyn ModelGateway)
234        }
235    }
236
237    #[test]
238    fn orchestrate_encode_validates_routes_gateway_and_plans_symbolic_frames() {
239        let planner = FixedWidthBitPlanner::default();
240        let languages = language_registry();
241        let strategies = strategy_registry_requires_model();
242        let models = model_registry("stub");
243        let gateways = gateway_registry_with("stub");
244        let orchestrator =
245            PipelineOrchestrator::new(&languages, &strategies, &models, &gateways, &planner);
246        let request = EncodeRequest {
247            carrier_text: "carrier text".to_string(),
248            payload: vec![0xAA, 0xBB],
249            options: PipelineOptions {
250                language: LanguageTag::new("fa").expect("valid language"),
251                strategy: StrategyId::new("symbolic").expect("valid strategy"),
252                model_selection: Some(ModelSelection {
253                    provider: ProviderId::new("stub").expect("valid provider"),
254                    model: ModelId::new("test-model").expect("valid model"),
255                }),
256            },
257        };
258        let schemas = sample_schemas();
259
260        let result = orchestrator
261            .orchestrate_encode(request, &schemas)
262            .expect("encode orchestration should succeed");
263
264        assert!(!result.symbolic_plan.frames.is_empty());
265        assert!(result.gateway_response.is_some());
266        assert!(
267            result
268                .gateway_response
269                .expect("gateway response should exist")
270                .content
271                .contains("encode:fa:1")
272        );
273    }
274
275    #[test]
276    fn orchestrate_decode_reconstructs_payload_without_gateway_when_model_is_absent() {
277        let planner = FixedWidthBitPlanner::default();
278        let payload = vec![0x10, 0x20, 0x30];
279        let schemas = sample_schemas();
280        let symbolic_plan = planner
281            .plan_payload(&payload, &schemas)
282            .expect("planning should succeed");
283        let languages = language_registry();
284        let strategies = strategy_registry_without_model_requirement();
285        let models = model_registry("stub");
286        let gateways = gateway_registry_with("stub");
287        let orchestrator =
288            PipelineOrchestrator::new(&languages, &strategies, &models, &gateways, &planner);
289        let request = DecodeRequest {
290            stego_text: "stego sample".to_string(),
291            options: PipelineOptions {
292                language: LanguageTag::new("fa").expect("valid language"),
293                strategy: StrategyId::new("symbolic-lite").expect("valid strategy"),
294                model_selection: None,
295            },
296        };
297
298        let result = orchestrator
299            .orchestrate_decode(request, &symbolic_plan.frames, &schemas)
300            .expect("decode orchestration should succeed");
301
302        assert_eq!(result.payload, payload);
303        assert!(result.gateway_response.is_none());
304    }
305
306    #[test]
307    fn orchestrate_encode_fails_when_provider_cannot_be_routed() {
308        let planner = FixedWidthBitPlanner::default();
309        let languages = language_registry();
310        let strategies = strategy_registry_requires_model();
311        let models = model_registry("unknown-provider");
312        let gateways = gateway_registry_with("stub");
313        let orchestrator =
314            PipelineOrchestrator::new(&languages, &strategies, &models, &gateways, &planner);
315        let request = EncodeRequest {
316            carrier_text: "carrier text".to_string(),
317            payload: vec![0xAA],
318            options: PipelineOptions {
319                language: LanguageTag::new("fa").expect("valid language"),
320                strategy: StrategyId::new("symbolic").expect("valid strategy"),
321                model_selection: Some(ModelSelection {
322                    provider: ProviderId::new("unknown-provider").expect("valid provider"),
323                    model: ModelId::new("test-model").expect("valid model"),
324                }),
325            },
326        };
327
328        let error = orchestrator
329            .orchestrate_encode(request, &sample_schemas())
330            .expect_err("orchestration should fail");
331        assert!(error.to_string().contains("model is not supported"));
332    }
333
334    fn operation_name(operation: GatewayOperation) -> &'static str {
335        match operation {
336            GatewayOperation::Encode => "encode",
337            GatewayOperation::Decode => "decode",
338            GatewayOperation::Analyze => "analyze",
339        }
340    }
341
342    fn language_registry() -> TestLanguageRegistry {
343        TestLanguageRegistry {
344            values: vec![LanguageDescriptor {
345                tag: LanguageTag::new("fa").expect("valid language"),
346                display_name: "Persian".to_string(),
347                direction: TextDirection::RightToLeft,
348            }],
349        }
350    }
351
352    fn strategy_registry_requires_model() -> TestStrategyRegistry {
353        TestStrategyRegistry {
354            values: vec![StrategyDescriptor {
355                id: StrategyId::new("symbolic").expect("valid strategy"),
356                display_name: "Symbolic".to_string(),
357                required_capabilities: vec![ModelCapability::DeterministicSeed],
358            }],
359        }
360    }
361
362    fn strategy_registry_without_model_requirement() -> TestStrategyRegistry {
363        TestStrategyRegistry {
364            values: vec![StrategyDescriptor {
365                id: StrategyId::new("symbolic-lite").expect("valid strategy"),
366                display_name: "Symbolic Lite".to_string(),
367                required_capabilities: Vec::new(),
368            }],
369        }
370    }
371
372    fn model_registry(provider: &str) -> TestModelRegistry {
373        TestModelRegistry {
374            values: vec![ModelDescriptor {
375                provider: ProviderId::new(provider).expect("valid provider"),
376                model: ModelId::new("test-model").expect("valid model"),
377                display_name: "Test Model".to_string(),
378                supported_languages: vec![LanguageTag::new("fa").expect("valid language")],
379                capabilities: vec![ModelCapability::DeterministicSeed],
380            }],
381        }
382    }
383
384    fn gateway_registry_with(provider: &str) -> TestGatewayRegistry {
385        TestGatewayRegistry {
386            gateways: vec![TestGateway {
387                provider: ProviderId::new(provider).expect("valid provider"),
388            }],
389        }
390    }
391
392    fn sample_schemas() -> Vec<SymbolicFrameSchema> {
393        vec![SymbolicFrameSchema {
394            template_id: crate::TemplateId::new("fa-test").expect("valid template"),
395            fields: vec![SymbolicFieldSpec {
396                slot: crate::SlotId::new("payload").expect("valid slot"),
397                bit_width: 8,
398            }],
399        }]
400    }
401}