Skip to main content

harn_vm/
tenant.rs

1use std::collections::BTreeMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use futures::stream::BoxStream;
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9use subtle::ConstantTimeEq;
10use time::OffsetDateTime;
11
12use crate::event_log::{
13    AnyEventLog, CompactReport, ConsumerId, EventId, EventLog, EventLogDescription, LogError,
14    LogEvent, Topic,
15};
16use crate::orchestration::CapabilityPolicy;
17use crate::secrets::{
18    RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider,
19};
20use crate::TenantId;
21
22pub const TENANT_REGISTRY_DIR: &str = "tenants";
23pub const TENANT_REGISTRY_FILE: &str = "registry.json";
24pub const TENANT_SECRET_NAMESPACE_PREFIX: &str = "harn.tenant.";
25pub const TENANT_EVENT_TOPIC_PREFIX: &str = "tenant.";
26
27#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
28#[serde(transparent)]
29pub struct ApiKeyId(pub String);
30
31#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
32#[serde(default)]
33pub struct TenantBudget {
34    pub daily_cost_usd: Option<f64>,
35    pub hourly_cost_usd: Option<f64>,
36    pub ingest_per_minute: Option<u32>,
37    pub event_log_size_bytes: u64,
38    pub in_flight_dispatches: u32,
39    pub dlq_entries: u32,
40}
41
42impl Default for TenantBudget {
43    fn default() -> Self {
44        Self {
45            daily_cost_usd: None,
46            hourly_cost_usd: None,
47            ingest_per_minute: None,
48            event_log_size_bytes: 10 * 1024 * 1024 * 1024,
49            in_flight_dispatches: 100,
50            dlq_entries: 10_000,
51        }
52    }
53}
54
55#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
56pub struct TenantScope {
57    pub id: TenantId,
58    pub state_root: PathBuf,
59    pub secret_namespace: String,
60    pub event_log_topic_prefix: String,
61    pub capability_ceiling: CapabilityPolicy,
62    pub budget: TenantBudget,
63    pub api_key_ids: Vec<ApiKeyId>,
64}
65
66impl TenantScope {
67    pub fn new(id: TenantId, orchestrator_state_root: impl AsRef<Path>) -> Result<Self, String> {
68        validate_tenant_id(&id.0)?;
69        let state_root = orchestrator_state_root
70            .as_ref()
71            .join(TENANT_REGISTRY_DIR)
72            .join(&id.0);
73        Ok(Self {
74            secret_namespace: tenant_secret_namespace(&id),
75            event_log_topic_prefix: tenant_event_topic_prefix(&id),
76            id,
77            state_root,
78            capability_ceiling: CapabilityPolicy::default(),
79            budget: TenantBudget::default(),
80            api_key_ids: Vec::new(),
81        })
82    }
83
84    pub fn topic(&self, topic: &Topic) -> Result<Topic, LogError> {
85        tenant_topic(&self.id, topic)
86    }
87}
88
89#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
90#[serde(rename_all = "snake_case")]
91pub enum TenantStatus {
92    Active,
93    Suspended,
94}
95
96#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
97pub struct TenantApiKeyRecord {
98    pub id: ApiKeyId,
99    pub hash_sha256: String,
100    pub prefix: String,
101    pub created_at: String,
102}
103
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
105pub struct TenantRecord {
106    pub scope: TenantScope,
107    pub status: TenantStatus,
108    pub created_at: String,
109    pub suspended_at: Option<String>,
110    pub api_keys: Vec<TenantApiKeyRecord>,
111}
112
113#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
114#[serde(default)]
115pub struct TenantRegistrySnapshot {
116    pub tenants: Vec<TenantRecord>,
117}
118
119#[derive(Clone, Debug)]
120pub struct TenantStore {
121    state_dir: PathBuf,
122    tenants: BTreeMap<String, TenantRecord>,
123}
124
125#[derive(Clone, Debug, PartialEq, Eq)]
126pub enum TenantResolutionError {
127    Unknown,
128    Suspended(TenantId),
129}
130
131impl TenantStore {
132    pub fn load(state_dir: impl AsRef<Path>) -> Result<Self, String> {
133        let state_dir = state_dir.as_ref().to_path_buf();
134        let path = registry_path(&state_dir);
135        if !path.is_file() {
136            return Ok(Self {
137                state_dir,
138                tenants: BTreeMap::new(),
139            });
140        }
141        let content = std::fs::read_to_string(&path)
142            .map_err(|error| format!("failed to read {}: {error}", path.display()))?;
143        let snapshot: TenantRegistrySnapshot = serde_json::from_str(&content)
144            .map_err(|error| format!("failed to parse {}: {error}", path.display()))?;
145        let tenants = snapshot
146            .tenants
147            .into_iter()
148            .map(|record| (record.scope.id.0.clone(), record))
149            .collect();
150        Ok(Self { state_dir, tenants })
151    }
152
153    pub fn save(&self) -> Result<(), String> {
154        let dir = self.state_dir.join(TENANT_REGISTRY_DIR);
155        std::fs::create_dir_all(&dir)
156            .map_err(|error| format!("failed to create {}: {error}", dir.display()))?;
157        let snapshot = TenantRegistrySnapshot {
158            tenants: self.list().to_vec(),
159        };
160        let encoded = serde_json::to_string_pretty(&snapshot).map_err(|error| error.to_string())?;
161        let path = registry_path(&self.state_dir);
162        write_file_replace(&path, encoded.as_bytes())
163            .map_err(|error| format!("failed to write {}: {error}", path.display()))
164    }
165
166    pub fn create_tenant(
167        &mut self,
168        id: impl Into<String>,
169        budget: TenantBudget,
170    ) -> Result<(TenantRecord, String), String> {
171        let id = id.into();
172        validate_tenant_id(&id)?;
173        if self.tenants.contains_key(&id) {
174            return Err(format!("tenant '{id}' already exists"));
175        }
176        let api_key = generate_api_key(&id);
177        let api_key_id = ApiKeyId(format!("key_{}", uuid::Uuid::now_v7()));
178        let created_at = now_rfc3339();
179        let mut scope = TenantScope::new(TenantId::new(id.clone()), &self.state_dir)?;
180        scope.budget = budget;
181        scope.api_key_ids.push(api_key_id.clone());
182        std::fs::create_dir_all(&scope.state_root).map_err(|error| {
183            format!(
184                "failed to create tenant state dir {}: {error}",
185                scope.state_root.display()
186            )
187        })?;
188        let record = TenantRecord {
189            scope,
190            status: TenantStatus::Active,
191            created_at: created_at.clone(),
192            suspended_at: None,
193            api_keys: vec![TenantApiKeyRecord {
194                id: api_key_id,
195                hash_sha256: api_key_hash(&api_key),
196                prefix: api_key_prefix(&api_key),
197                created_at,
198            }],
199        };
200        self.tenants.insert(id, record.clone());
201        self.save()?;
202        Ok((record, api_key))
203    }
204
205    pub fn list(&self) -> Vec<TenantRecord> {
206        self.tenants.values().cloned().collect()
207    }
208
209    pub fn get(&self, id: &str) -> Option<&TenantRecord> {
210        self.tenants.get(id)
211    }
212
213    pub fn suspend(&mut self, id: &str) -> Result<TenantRecord, String> {
214        let record = self
215            .tenants
216            .get_mut(id)
217            .ok_or_else(|| format!("unknown tenant '{id}'"))?;
218        record.status = TenantStatus::Suspended;
219        record.suspended_at = Some(now_rfc3339());
220        let record = record.clone();
221        self.save()?;
222        Ok(record)
223    }
224
225    pub fn delete(&mut self, id: &str) -> Result<TenantRecord, String> {
226        let record = self
227            .tenants
228            .remove(id)
229            .ok_or_else(|| format!("unknown tenant '{id}'"))?;
230        if record.scope.state_root.exists() {
231            std::fs::remove_dir_all(&record.scope.state_root).map_err(|error| {
232                format!(
233                    "failed to remove tenant state dir {}: {error}",
234                    record.scope.state_root.display()
235                )
236            })?;
237        }
238        self.save()?;
239        Ok(record)
240    }
241
242    pub fn resolve_api_key(&self, candidate: &str) -> Result<TenantScope, TenantResolutionError> {
243        let candidate_hash = api_key_hash(candidate);
244        for record in self.tenants.values() {
245            let matched = record.api_keys.iter().any(|key| {
246                key.hash_sha256
247                    .as_bytes()
248                    .ct_eq(candidate_hash.as_bytes())
249                    .into()
250            });
251            if matched {
252                return match record.status {
253                    TenantStatus::Active => Ok(record.scope.clone()),
254                    TenantStatus::Suspended => {
255                        Err(TenantResolutionError::Suspended(record.scope.id.clone()))
256                    }
257                };
258            }
259        }
260        Err(TenantResolutionError::Unknown)
261    }
262}
263
264pub struct TenantEventLog {
265    inner: Arc<AnyEventLog>,
266    scope: TenantScope,
267}
268
269impl TenantEventLog {
270    pub fn new(inner: Arc<AnyEventLog>, scope: TenantScope) -> Self {
271        Self { inner, scope }
272    }
273
274    pub fn scope(&self) -> &TenantScope {
275        &self.scope
276    }
277
278    fn scoped_topic(&self, topic: &Topic) -> Result<Topic, LogError> {
279        if topic.as_str().starts_with(TENANT_EVENT_TOPIC_PREFIX) {
280            if topic
281                .as_str()
282                .starts_with(&self.scope.event_log_topic_prefix)
283            {
284                return Ok(topic.clone());
285            }
286            return Err(LogError::InvalidTopic(format!(
287                "topic '{}' is outside tenant scope '{}'",
288                topic.as_str(),
289                self.scope.id.0
290            )));
291        }
292        self.scope.topic(topic)
293    }
294}
295
296impl EventLog for TenantEventLog {
297    fn describe(&self) -> EventLogDescription {
298        self.inner.describe()
299    }
300
301    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
302        self.inner.append(&self.scoped_topic(topic)?, event).await
303    }
304
305    async fn flush(&self) -> Result<(), LogError> {
306        self.inner.flush().await
307    }
308
309    async fn read_range(
310        &self,
311        topic: &Topic,
312        from: Option<EventId>,
313        limit: usize,
314    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
315        self.inner
316            .read_range(&self.scoped_topic(topic)?, from, limit)
317            .await
318    }
319
320    async fn subscribe(
321        self: Arc<Self>,
322        topic: &Topic,
323        from: Option<EventId>,
324    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
325        self.inner
326            .clone()
327            .subscribe(&self.scoped_topic(topic)?, from)
328            .await
329    }
330
331    async fn ack(
332        &self,
333        topic: &Topic,
334        consumer: &ConsumerId,
335        up_to: EventId,
336    ) -> Result<(), LogError> {
337        self.inner
338            .ack(&self.scoped_topic(topic)?, consumer, up_to)
339            .await
340    }
341
342    async fn consumer_cursor(
343        &self,
344        topic: &Topic,
345        consumer: &ConsumerId,
346    ) -> Result<Option<EventId>, LogError> {
347        self.inner
348            .consumer_cursor(&self.scoped_topic(topic)?, consumer)
349            .await
350    }
351
352    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
353        self.inner.latest(&self.scoped_topic(topic)?).await
354    }
355
356    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
357        self.inner.compact(&self.scoped_topic(topic)?, before).await
358    }
359}
360
361pub struct TenantSecretProvider {
362    inner: Arc<dyn SecretProvider>,
363    scope: TenantScope,
364}
365
366impl TenantSecretProvider {
367    pub fn new(inner: Arc<dyn SecretProvider>, scope: TenantScope) -> Self {
368        Self { inner, scope }
369    }
370
371    fn scoped_id(&self, id: &SecretId) -> Result<SecretId, SecretError> {
372        if id.namespace == self.scope.secret_namespace {
373            return Ok(id.clone());
374        }
375        if id.namespace.starts_with(TENANT_SECRET_NAMESPACE_PREFIX) {
376            return Err(SecretError::NotFound {
377                provider: self.namespace().to_string(),
378                id: id.clone(),
379            });
380        }
381        Ok(SecretId {
382            namespace: self.scope.secret_namespace.clone(),
383            name: id.name.clone(),
384            version: id.version.clone(),
385        })
386    }
387}
388
389#[async_trait]
390impl SecretProvider for TenantSecretProvider {
391    async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
392        self.inner.get(&self.scoped_id(id)?).await
393    }
394
395    async fn put(&self, id: &SecretId, value: SecretBytes) -> Result<(), SecretError> {
396        self.inner.put(&self.scoped_id(id)?, value).await
397    }
398
399    async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
400        self.inner.rotate(&self.scoped_id(id)?).await
401    }
402
403    async fn list(&self, prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
404        self.inner.list(&self.scoped_id(prefix)?).await
405    }
406
407    fn namespace(&self) -> &str {
408        &self.scope.secret_namespace
409    }
410
411    fn supports_versions(&self) -> bool {
412        self.inner.supports_versions()
413    }
414}
415
416pub fn tenant_event_topic_prefix(id: &TenantId) -> String {
417    format!("{TENANT_EVENT_TOPIC_PREFIX}{}.", id.0)
418}
419
420pub fn tenant_secret_namespace(id: &TenantId) -> String {
421    format!("{TENANT_SECRET_NAMESPACE_PREFIX}{}", id.0)
422}
423
424pub fn tenant_topic(id: &TenantId, topic: &Topic) -> Result<Topic, LogError> {
425    validate_tenant_id(&id.0).map_err(LogError::InvalidTopic)?;
426    let prefix = tenant_event_topic_prefix(id);
427    if topic.as_str().starts_with(&prefix) {
428        return Ok(topic.clone());
429    }
430    if topic.as_str().starts_with(TENANT_EVENT_TOPIC_PREFIX) {
431        return Err(LogError::InvalidTopic(format!(
432            "topic '{}' is outside tenant scope '{}'",
433            topic.as_str(),
434            id.0
435        )));
436    }
437    Topic::new(format!("{prefix}{}", topic.as_str()))
438}
439
440pub fn validate_tenant_id(id: &str) -> Result<(), String> {
441    if id.trim().is_empty() {
442        return Err("tenant id cannot be empty".to_string());
443    }
444    if !id
445        .chars()
446        .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '_' | '-'))
447    {
448        return Err(format!(
449            "tenant id '{id}' contains unsupported characters; use ASCII letters, numbers, '_' or '-'"
450        ));
451    }
452    Ok(())
453}
454
455fn registry_path(state_dir: &Path) -> PathBuf {
456    state_dir
457        .join(TENANT_REGISTRY_DIR)
458        .join(TENANT_REGISTRY_FILE)
459}
460
461fn write_file_replace(path: &Path, contents: &[u8]) -> std::io::Result<()> {
462    let dir = path.parent().unwrap_or_else(|| Path::new("."));
463    let tmp_path = dir.join(format!(
464        ".{}.{}.tmp",
465        path.file_name()
466            .and_then(|name| name.to_str())
467            .unwrap_or("registry"),
468        uuid::Uuid::now_v7()
469    ));
470    std::fs::write(&tmp_path, contents)?;
471    #[cfg(windows)]
472    if path.exists() {
473        std::fs::remove_file(path)?;
474    }
475    std::fs::rename(&tmp_path, path).inspect_err(|_| {
476        let _ = std::fs::remove_file(&tmp_path);
477    })?;
478    Ok(())
479}
480
481fn generate_api_key(id: &str) -> String {
482    let random: [u8; 32] = rand::random();
483    format!("harn_tenant_{id}_{}", hex::encode(random))
484}
485
486fn api_key_hash(value: &str) -> String {
487    hex::encode(Sha256::digest(value.as_bytes()))
488}
489
490fn api_key_prefix(value: &str) -> String {
491    value.chars().take(18).collect()
492}
493
494fn now_rfc3339() -> String {
495    OffsetDateTime::now_utc()
496        .format(&time::format_description::well_known::Rfc3339)
497        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
498}
499
500#[cfg(test)]
501mod tests {
502    use std::collections::BTreeMap;
503    use std::sync::Mutex;
504
505    use async_trait::async_trait;
506
507    use super::*;
508    use crate::event_log::{EventLog, MemoryEventLog};
509
510    #[tokio::test]
511    async fn tenant_event_log_enforces_topic_prefix() {
512        let inner = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
513        let scope =
514            TenantScope::new(TenantId::new("tenant-a"), std::env::temp_dir()).expect("scope");
515        let tenant_log = Arc::new(TenantEventLog::new(inner.clone(), scope));
516        let base = Topic::new("trigger.outbox").unwrap();
517
518        tenant_log
519            .append(&base, LogEvent::new("ok", serde_json::json!({"n": 1})))
520            .await
521            .unwrap();
522
523        let scoped = Topic::new("tenant.tenant-a.trigger.outbox").unwrap();
524        assert_eq!(inner.read_range(&scoped, None, 10).await.unwrap().len(), 1);
525        let other = Topic::new("tenant.tenant-b.trigger.outbox").unwrap();
526        assert!(tenant_log
527            .append(&other, LogEvent::new("bad", serde_json::json!({})))
528            .await
529            .is_err());
530    }
531
532    struct MemorySecretProvider {
533        namespace: String,
534        values: Mutex<BTreeMap<SecretId, SecretBytes>>,
535    }
536
537    #[async_trait]
538    impl SecretProvider for MemorySecretProvider {
539        async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
540            self.values
541                .lock()
542                .expect("secret map")
543                .get(id)
544                .map(SecretBytes::reborrow)
545                .ok_or_else(|| SecretError::NotFound {
546                    provider: self.namespace.clone(),
547                    id: id.clone(),
548                })
549        }
550
551        async fn put(&self, id: &SecretId, value: SecretBytes) -> Result<(), SecretError> {
552            self.values
553                .lock()
554                .expect("secret map")
555                .insert(id.clone(), value);
556            Ok(())
557        }
558
559        async fn rotate(&self, _id: &SecretId) -> Result<RotationHandle, SecretError> {
560            Err(SecretError::Unsupported {
561                provider: self.namespace.clone(),
562                operation: "rotate",
563            })
564        }
565
566        async fn list(&self, _prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
567            Ok(Vec::new())
568        }
569
570        fn namespace(&self) -> &str {
571            &self.namespace
572        }
573
574        fn supports_versions(&self) -> bool {
575            false
576        }
577    }
578
579    #[tokio::test]
580    async fn tenant_secret_provider_rescopes_and_denies_cross_tenant_ids() {
581        let inner = Arc::new(MemorySecretProvider {
582            namespace: "global".to_string(),
583            values: Mutex::new(BTreeMap::new()),
584        });
585        let scope =
586            TenantScope::new(TenantId::new("tenant-a"), std::env::temp_dir()).expect("scope");
587        let provider = TenantSecretProvider::new(inner.clone(), scope.clone());
588
589        provider
590            .put(
591                &SecretId::new("github", "webhook"),
592                SecretBytes::from("a-secret"),
593            )
594            .await
595            .unwrap();
596
597        let scoped_id = SecretId::new(scope.secret_namespace, "webhook");
598        let value = inner.get(&scoped_id).await.unwrap();
599        value.with_exposed(|bytes| assert_eq!(bytes, b"a-secret"));
600
601        let cross = SecretId::new("harn.tenant.tenant-b", "webhook");
602        assert!(provider.get(&cross).await.is_err());
603    }
604
605    #[test]
606    fn tenant_store_save_replaces_registry_without_temp_leak() {
607        let temp = tempfile::tempdir().unwrap();
608        let mut store = TenantStore::load(temp.path()).unwrap();
609        store
610            .create_tenant("tenant-a", TenantBudget::default())
611            .unwrap();
612
613        let registry = registry_path(temp.path());
614        assert!(registry.is_file());
615        let leaked_temp = std::fs::read_dir(registry.parent().unwrap())
616            .unwrap()
617            .filter_map(Result::ok)
618            .any(|entry| entry.file_name().to_string_lossy().ends_with(".tmp"));
619        assert!(!leaked_temp);
620    }
621}