Skip to main content

greentic_operator/
provider_config_envelope.rs

1use std::fs::File;
2use std::io::Read;
3use std::path::{Path, PathBuf};
4
5use anyhow::{Context, anyhow};
6use chrono::Utc;
7use greentic_types::cbor::canonical;
8use greentic_types::decode_pack_manifest;
9use greentic_types::schemas::common::schema_ir::SchemaIr;
10use greentic_types::schemas::component::v0_6_0::ComponentDescribe;
11use greentic_types::schemas::component::v0_6_0::{
12    ComponentOperation, ComponentRunInput, ComponentRunOutput,
13};
14use serde::{Deserialize, Serialize};
15use serde_json::{Value as JsonValue, json};
16use zip::ZipArchive;
17
18use crate::runtime_state::atomic_write;
19
20const ABI_VERSION: &str = "greentic:component@0.6.0";
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ConfigEnvelope {
24    pub config: JsonValue,
25    pub component_id: String,
26    pub abi_version: String,
27    pub resolved_digest: String,
28    pub describe_hash: String,
29    #[serde(default, skip_serializing_if = "Option::is_none")]
30    pub schema_hash: Option<String>,
31    pub operation_id: String,
32    #[serde(default, skip_serializing_if = "Option::is_none")]
33    pub updated_at: Option<String>,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct ContractCacheEntry {
38    pub component_id: String,
39    pub abi_version: String,
40    pub resolved_digest: String,
41    pub describe_hash: String,
42    #[serde(default, skip_serializing_if = "Option::is_none")]
43    pub schema_hash: Option<String>,
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub config_schema: Option<JsonValue>,
46}
47
48struct PackProvenance {
49    component_id: String,
50    resolved_digest: String,
51    describe_hash: String,
52    schema_hash: Option<String>,
53    config_schema: Option<JsonValue>,
54}
55
56pub fn write_provider_config_envelope(
57    providers_root: &Path,
58    provider_id: &str,
59    operation_id: &str,
60    config: &JsonValue,
61    pack_path: &Path,
62    backup: bool,
63) -> anyhow::Result<PathBuf> {
64    let provenance = read_pack_provenance(pack_path, provider_id)?;
65    let _ = write_contract_cache_entry(providers_root, &provenance);
66    let envelope = ConfigEnvelope {
67        config: config.clone(),
68        component_id: provenance.component_id,
69        abi_version: ABI_VERSION.to_string(),
70        resolved_digest: provenance.resolved_digest,
71        describe_hash: provenance.describe_hash,
72        schema_hash: provenance.schema_hash,
73        operation_id: operation_id.to_string(),
74        // Useful for audit/debug; exclude from deterministic comparisons.
75        updated_at: Some(Utc::now().to_rfc3339()),
76    };
77    let bytes = canonical::to_canonical_cbor(&envelope).map_err(|err| anyhow!("{err}"))?;
78    let path = providers_root
79        .join(provider_id)
80        .join("config.envelope.cbor");
81    if backup && path.exists() {
82        let backup_path = path.with_extension("cbor.bak");
83        if let Some(parent) = backup_path.parent() {
84            std::fs::create_dir_all(parent)?;
85        }
86        std::fs::copy(&path, &backup_path)?;
87    }
88    atomic_write(&path, &bytes)?;
89    Ok(path)
90}
91
92pub fn read_provider_config_envelope(
93    providers_root: &Path,
94    provider_id: &str,
95) -> anyhow::Result<Option<ConfigEnvelope>> {
96    let path = providers_root
97        .join(provider_id)
98        .join("config.envelope.cbor");
99    if !path.exists() {
100        return Ok(None);
101    }
102    let bytes = std::fs::read(&path)?;
103    let envelope: ConfigEnvelope = serde_cbor::from_slice(&bytes)?;
104    Ok(Some(envelope))
105}
106
107pub fn resolved_describe_hash(
108    pack_path: &Path,
109    fallback_component_id: &str,
110) -> anyhow::Result<String> {
111    Ok(read_pack_provenance(pack_path, fallback_component_id)?.describe_hash)
112}
113
114pub fn ensure_contract_compatible(
115    providers_root: &Path,
116    provider_id: &str,
117    flow_id: &str,
118    pack_path: &Path,
119    allow_contract_change: bool,
120) -> anyhow::Result<()> {
121    let Some(stored) = read_provider_config_envelope(providers_root, provider_id)? else {
122        return Ok(());
123    };
124    let resolved = resolved_describe_hash(pack_path, provider_id)?;
125    if stored.describe_hash != resolved && !allow_contract_change {
126        return Err(anyhow!(
127            "OP_CONTRACT_DRIFT: provider={} flow={} stored_describe_hash={} resolved_describe_hash={} (pass --allow-contract-change to override)",
128            provider_id,
129            flow_id,
130            stored.describe_hash,
131            resolved
132        ));
133    }
134    Ok(())
135}
136
137fn write_contract_cache_entry(
138    providers_root: &Path,
139    provenance: &PackProvenance,
140) -> anyhow::Result<PathBuf> {
141    let cache_dir = providers_root.join("_contracts");
142    let path = cache_dir.join(format!("{}.contract.cbor", provenance.resolved_digest));
143    let entry = ContractCacheEntry {
144        component_id: provenance.component_id.clone(),
145        abi_version: ABI_VERSION.to_string(),
146        resolved_digest: provenance.resolved_digest.clone(),
147        describe_hash: provenance.describe_hash.clone(),
148        schema_hash: provenance.schema_hash.clone(),
149        config_schema: provenance.config_schema.clone(),
150    };
151    let bytes = canonical::to_canonical_cbor(&entry).map_err(|err| anyhow!("{err}"))?;
152    atomic_write(&path, &bytes)?;
153    Ok(path)
154}
155
156fn read_pack_provenance(
157    pack_path: &Path,
158    fallback_component_id: &str,
159) -> anyhow::Result<PackProvenance> {
160    let pack_bytes = std::fs::read(pack_path).unwrap_or_default();
161    let resolved_digest = digest_hex(&pack_bytes);
162    let manifest_bytes = read_manifest_cbor_bytes(pack_path).ok();
163    let manifest = manifest_bytes
164        .as_ref()
165        .and_then(|bytes| decode_pack_manifest(bytes).ok());
166
167    let Some(manifest) = manifest else {
168        return Ok(PackProvenance {
169            component_id: fallback_component_id.to_string(),
170            resolved_digest,
171            describe_hash: digest_hex(fallback_component_id.as_bytes()),
172            schema_hash: None,
173            config_schema: None,
174        });
175    };
176
177    let component = manifest.components.first();
178    let component_id = component
179        .map(|value| value.id.to_string())
180        .unwrap_or_else(|| fallback_component_id.to_string());
181
182    let describe = ComponentDescribe {
183        info: greentic_types::schemas::component::v0_6_0::ComponentInfo {
184            id: component_id.clone(),
185            version: component
186                .map(|value| value.version.to_string())
187                .unwrap_or_else(|| "0.0.0".to_string()),
188            role: "provider".to_string(),
189            display_name: None,
190        },
191        provided_capabilities: Vec::new(),
192        required_capabilities: Vec::new(),
193        metadata: Default::default(),
194        operations: component
195            .map(|value| {
196                value
197                    .operations
198                    .iter()
199                    .map(|op| ComponentOperation {
200                        id: op.name.clone(),
201                        display_name: None,
202                        input: ComponentRunInput {
203                            schema: SchemaIr::Null,
204                        },
205                        output: ComponentRunOutput {
206                            schema: SchemaIr::Null,
207                        },
208                        defaults: Default::default(),
209                        redactions: Vec::new(),
210                        constraints: Default::default(),
211                        schema_hash: digest_hex(op.name.as_bytes()),
212                    })
213                    .collect::<Vec<_>>()
214            })
215            .unwrap_or_default(),
216        config_schema: SchemaIr::Null,
217    };
218    let describe_hash = hash_canonical(&describe)?;
219
220    let schema_hash = component
221        .map(|value| {
222            let schema_payload = json!({
223                "input": JsonValue::Null,
224                "output": JsonValue::Null,
225                "config": value.config_schema.clone().unwrap_or(JsonValue::Null),
226            });
227            hash_canonical(&schema_payload)
228        })
229        .transpose()?;
230
231    Ok(PackProvenance {
232        component_id,
233        resolved_digest,
234        describe_hash,
235        schema_hash,
236        config_schema: component.and_then(|value| value.config_schema.clone()),
237    })
238}
239
240fn hash_canonical<T: Serialize>(value: &T) -> anyhow::Result<String> {
241    let cbor = canonical::to_canonical_cbor(value).map_err(|err| anyhow!("{err}"))?;
242    Ok(digest_hex(&cbor))
243}
244
245fn digest_hex(bytes: &[u8]) -> String {
246    let digest = canonical::blake3_128(bytes);
247    let mut out = String::with_capacity(digest.len() * 2);
248    for byte in digest {
249        out.push(hex_nibble(byte >> 4));
250        out.push(hex_nibble(byte & 0x0f));
251    }
252    out
253}
254
255fn hex_nibble(value: u8) -> char {
256    match value {
257        0..=9 => (b'0' + value) as char,
258        10..=15 => (b'a' + (value - 10)) as char,
259        _ => '0',
260    }
261}
262
263fn read_manifest_cbor_bytes(pack_path: &Path) -> anyhow::Result<Vec<u8>> {
264    let file = File::open(pack_path)?;
265    let mut archive = ZipArchive::new(file)?;
266    let mut manifest = archive
267        .by_name("manifest.cbor")
268        .with_context(|| format!("manifest.cbor missing in {}", pack_path.display()))?;
269    let mut bytes = Vec::new();
270    manifest.read_to_end(&mut bytes)?;
271    Ok(bytes)
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277    use std::io::Write;
278    use tempfile::tempdir;
279    use zip::write::FileOptions;
280
281    #[test]
282    fn writes_cbor_envelope() {
283        let temp = tempdir().unwrap();
284        let pack = temp.path().join("provider.gtpack");
285        write_test_pack(&pack).unwrap();
286
287        let providers_root = temp
288            .path()
289            .join("state")
290            .join("runtime")
291            .join("demo")
292            .join("providers");
293        let path = write_provider_config_envelope(
294            &providers_root,
295            "messaging-telegram",
296            "setup_default",
297            &json!({"token":"abc"}),
298            &pack,
299            false,
300        )
301        .unwrap();
302
303        assert!(path.ends_with("messaging-telegram/config.envelope.cbor"));
304        let bytes = std::fs::read(path).unwrap();
305        let decoded: ConfigEnvelope = serde_cbor::from_slice(&bytes).unwrap();
306        assert_eq!(decoded.component_id, "messaging-telegram");
307        assert_eq!(decoded.operation_id, "setup_default");
308        assert_eq!(decoded.abi_version, ABI_VERSION);
309        assert!(decoded.updated_at.is_some());
310        assert_eq!(decoded.config, json!({"token":"abc"}));
311        assert!(!decoded.describe_hash.is_empty());
312        assert!(!decoded.resolved_digest.is_empty());
313        let contracts = providers_root.join("_contracts");
314        assert!(contracts.exists());
315    }
316
317    #[test]
318    fn reports_contract_drift_without_override() {
319        let temp = tempdir().unwrap();
320        let pack = temp.path().join("provider.gtpack");
321        write_test_pack(&pack).unwrap();
322        let providers_root = temp
323            .path()
324            .join("state")
325            .join("runtime")
326            .join("demo")
327            .join("providers");
328        let provider_id = "messaging-telegram";
329        let provider_dir = providers_root.join(provider_id);
330        std::fs::create_dir_all(&provider_dir).unwrap();
331        let envelope = ConfigEnvelope {
332            config: json!({"token":"abc"}),
333            component_id: provider_id.to_string(),
334            abi_version: ABI_VERSION.to_string(),
335            resolved_digest: "digest".to_string(),
336            describe_hash: "different".to_string(),
337            schema_hash: None,
338            operation_id: "setup_default".to_string(),
339            updated_at: None,
340        };
341        let bytes = canonical::to_canonical_cbor(&envelope).unwrap();
342        std::fs::write(provider_dir.join("config.envelope.cbor"), bytes).unwrap();
343
344        let err =
345            ensure_contract_compatible(&providers_root, provider_id, "setup_default", &pack, false)
346                .unwrap_err();
347        assert!(err.to_string().contains("OP_CONTRACT_DRIFT"));
348    }
349
350    fn write_test_pack(path: &Path) -> anyhow::Result<()> {
351        let file = File::create(path)?;
352        let mut zip = zip::ZipWriter::new(file);
353        zip.start_file("manifest.cbor", FileOptions::<()>::default())?;
354        let manifest = json!({
355            "schema_version": "1.0.0",
356            "pack_id": "messaging-telegram",
357            "name": "messaging-telegram",
358            "version": "1.0.0",
359            "kind": "provider",
360            "publisher": "tests",
361            "components": [{
362                "id": "messaging-telegram",
363                "version": "1.0.0",
364                "supports": ["provider"],
365                "world": "greentic:component/component-v0-v6-v0@0.6.0",
366                "profiles": {},
367                "capabilities": { "provides": ["messaging"], "requires": [] },
368                "configurators": null,
369                "operations": [],
370                "config_schema": {"type":"object"},
371                "resources": {},
372                "dev_flows": {}
373            }],
374            "flows": [],
375            "dependencies": [],
376            "capabilities": [],
377            "secret_requirements": [],
378            "signatures": [],
379            "extensions": {}
380        });
381        let bytes = greentic_types::cbor::canonical::to_canonical_cbor(&manifest)
382            .map_err(|err| anyhow!("{err}"))?;
383        zip.write_all(&bytes)?;
384        zip.finish()?;
385        Ok(())
386    }
387}