Skip to main content

harn_vm/
tenant.rs

1use std::collections::BTreeMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use futures::stream::BoxStream;
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9use subtle::ConstantTimeEq;
10use time::OffsetDateTime;
11
12use crate::event_log::{
13    AnyEventLog, CompactReport, ConsumerId, EventId, EventLog, EventLogDescription, LogError,
14    LogEvent, Topic,
15};
16use crate::orchestration::CapabilityPolicy;
17use crate::secrets::{
18    RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider,
19};
20use crate::TenantId;
21
22pub const TENANT_REGISTRY_DIR: &str = "tenants";
23pub const TENANT_REGISTRY_FILE: &str = "registry.json";
24pub const TENANT_SECRET_NAMESPACE_PREFIX: &str = "harn.tenant.";
25pub const TENANT_EVENT_TOPIC_PREFIX: &str = "tenant.";
26
27#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
28#[serde(transparent)]
29pub struct ApiKeyId(pub String);
30
31#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
32#[serde(default)]
33pub struct TenantBudget {
34    pub daily_cost_usd: Option<f64>,
35    pub hourly_cost_usd: Option<f64>,
36    pub ingest_per_minute: Option<u32>,
37    pub event_log_size_bytes: u64,
38    pub in_flight_dispatches: u32,
39    pub dlq_entries: u32,
40}
41
42impl Default for TenantBudget {
43    fn default() -> Self {
44        Self {
45            daily_cost_usd: None,
46            hourly_cost_usd: None,
47            ingest_per_minute: None,
48            event_log_size_bytes: 10 * 1024 * 1024 * 1024,
49            in_flight_dispatches: 100,
50            dlq_entries: 10_000,
51        }
52    }
53}
54
55#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
56pub struct TenantScope {
57    pub id: TenantId,
58    pub state_root: PathBuf,
59    pub secret_namespace: String,
60    pub event_log_topic_prefix: String,
61    pub capability_ceiling: CapabilityPolicy,
62    pub budget: TenantBudget,
63    pub api_key_ids: Vec<ApiKeyId>,
64}
65
66impl TenantScope {
67    pub fn new(id: TenantId, orchestrator_state_root: impl AsRef<Path>) -> Result<Self, String> {
68        validate_tenant_id(&id.0)?;
69        let state_root = orchestrator_state_root
70            .as_ref()
71            .join(TENANT_REGISTRY_DIR)
72            .join(&id.0);
73        Ok(Self {
74            secret_namespace: tenant_secret_namespace(&id),
75            event_log_topic_prefix: tenant_event_topic_prefix(&id),
76            id,
77            state_root,
78            capability_ceiling: CapabilityPolicy::default(),
79            budget: TenantBudget::default(),
80            api_key_ids: Vec::new(),
81        })
82    }
83
84    pub fn topic(&self, topic: &Topic) -> Result<Topic, LogError> {
85        tenant_topic(&self.id, topic)
86    }
87}
88
89#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
90#[serde(rename_all = "snake_case")]
91pub enum TenantStatus {
92    Active,
93    Suspended,
94}
95
96#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
97pub struct TenantApiKeyRecord {
98    pub id: ApiKeyId,
99    pub hash_sha256: String,
100    pub prefix: String,
101    pub created_at: String,
102}
103
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
105pub struct TenantRecord {
106    pub scope: TenantScope,
107    pub status: TenantStatus,
108    pub created_at: String,
109    pub suspended_at: Option<String>,
110    pub api_keys: Vec<TenantApiKeyRecord>,
111}
112
113#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
114#[serde(default)]
115pub struct TenantRegistrySnapshot {
116    pub tenants: Vec<TenantRecord>,
117}
118
119#[derive(Clone, Debug)]
120pub struct TenantStore {
121    state_dir: PathBuf,
122    tenants: BTreeMap<String, TenantRecord>,
123}
124
125#[derive(Clone, Debug, PartialEq, Eq)]
126pub enum TenantResolutionError {
127    Unknown,
128    Suspended(TenantId),
129}
130
131impl TenantStore {
132    pub fn load(state_dir: impl AsRef<Path>) -> Result<Self, String> {
133        let state_dir = state_dir.as_ref().to_path_buf();
134        let path = registry_path(&state_dir);
135        if !path.is_file() {
136            return Ok(Self {
137                state_dir,
138                tenants: BTreeMap::new(),
139            });
140        }
141        let content = std::fs::read_to_string(&path)
142            .map_err(|error| format!("failed to read {}: {error}", path.display()))?;
143        let snapshot: TenantRegistrySnapshot = serde_json::from_str(&content)
144            .map_err(|error| format!("failed to parse {}: {error}", path.display()))?;
145        let tenants = snapshot
146            .tenants
147            .into_iter()
148            .map(|record| (record.scope.id.0.clone(), record))
149            .collect();
150        Ok(Self { state_dir, tenants })
151    }
152
153    pub fn save(&self) -> Result<(), String> {
154        let dir = self.state_dir.join(TENANT_REGISTRY_DIR);
155        std::fs::create_dir_all(&dir)
156            .map_err(|error| format!("failed to create {}: {error}", dir.display()))?;
157        let snapshot = TenantRegistrySnapshot {
158            tenants: self.list().to_vec(),
159        };
160        let encoded = serde_json::to_string_pretty(&snapshot).map_err(|error| error.to_string())?;
161        let path = registry_path(&self.state_dir);
162        std::fs::write(&path, encoded)
163            .map_err(|error| format!("failed to write {}: {error}", path.display()))
164    }
165
166    pub fn create_tenant(
167        &mut self,
168        id: impl Into<String>,
169        budget: TenantBudget,
170    ) -> Result<(TenantRecord, String), String> {
171        let id = id.into();
172        validate_tenant_id(&id)?;
173        if self.tenants.contains_key(&id) {
174            return Err(format!("tenant '{id}' already exists"));
175        }
176        let api_key = generate_api_key(&id);
177        let api_key_id = ApiKeyId(format!("key_{}", uuid::Uuid::now_v7()));
178        let created_at = now_rfc3339();
179        let mut scope = TenantScope::new(TenantId::new(id.clone()), &self.state_dir)?;
180        scope.budget = budget;
181        scope.api_key_ids.push(api_key_id.clone());
182        std::fs::create_dir_all(&scope.state_root).map_err(|error| {
183            format!(
184                "failed to create tenant state dir {}: {error}",
185                scope.state_root.display()
186            )
187        })?;
188        let record = TenantRecord {
189            scope,
190            status: TenantStatus::Active,
191            created_at: created_at.clone(),
192            suspended_at: None,
193            api_keys: vec![TenantApiKeyRecord {
194                id: api_key_id,
195                hash_sha256: api_key_hash(&api_key),
196                prefix: api_key_prefix(&api_key),
197                created_at,
198            }],
199        };
200        self.tenants.insert(id, record.clone());
201        self.save()?;
202        Ok((record, api_key))
203    }
204
205    pub fn list(&self) -> Vec<TenantRecord> {
206        self.tenants.values().cloned().collect()
207    }
208
209    pub fn get(&self, id: &str) -> Option<&TenantRecord> {
210        self.tenants.get(id)
211    }
212
213    pub fn suspend(&mut self, id: &str) -> Result<TenantRecord, String> {
214        let record = self
215            .tenants
216            .get_mut(id)
217            .ok_or_else(|| format!("unknown tenant '{id}'"))?;
218        record.status = TenantStatus::Suspended;
219        record.suspended_at = Some(now_rfc3339());
220        let record = record.clone();
221        self.save()?;
222        Ok(record)
223    }
224
225    pub fn delete(&mut self, id: &str) -> Result<TenantRecord, String> {
226        let record = self
227            .tenants
228            .remove(id)
229            .ok_or_else(|| format!("unknown tenant '{id}'"))?;
230        if record.scope.state_root.exists() {
231            std::fs::remove_dir_all(&record.scope.state_root).map_err(|error| {
232                format!(
233                    "failed to remove tenant state dir {}: {error}",
234                    record.scope.state_root.display()
235                )
236            })?;
237        }
238        self.save()?;
239        Ok(record)
240    }
241
242    pub fn resolve_api_key(&self, candidate: &str) -> Result<TenantScope, TenantResolutionError> {
243        let candidate_hash = api_key_hash(candidate);
244        for record in self.tenants.values() {
245            let matched = record.api_keys.iter().any(|key| {
246                key.hash_sha256
247                    .as_bytes()
248                    .ct_eq(candidate_hash.as_bytes())
249                    .into()
250            });
251            if matched {
252                return match record.status {
253                    TenantStatus::Active => Ok(record.scope.clone()),
254                    TenantStatus::Suspended => {
255                        Err(TenantResolutionError::Suspended(record.scope.id.clone()))
256                    }
257                };
258            }
259        }
260        Err(TenantResolutionError::Unknown)
261    }
262}
263
264pub struct TenantEventLog {
265    inner: Arc<AnyEventLog>,
266    scope: TenantScope,
267}
268
269impl TenantEventLog {
270    pub fn new(inner: Arc<AnyEventLog>, scope: TenantScope) -> Self {
271        Self { inner, scope }
272    }
273
274    pub fn scope(&self) -> &TenantScope {
275        &self.scope
276    }
277
278    fn scoped_topic(&self, topic: &Topic) -> Result<Topic, LogError> {
279        if topic.as_str().starts_with(TENANT_EVENT_TOPIC_PREFIX) {
280            if topic
281                .as_str()
282                .starts_with(&self.scope.event_log_topic_prefix)
283            {
284                return Ok(topic.clone());
285            }
286            return Err(LogError::InvalidTopic(format!(
287                "topic '{}' is outside tenant scope '{}'",
288                topic.as_str(),
289                self.scope.id.0
290            )));
291        }
292        self.scope.topic(topic)
293    }
294}
295
296impl EventLog for TenantEventLog {
297    fn describe(&self) -> EventLogDescription {
298        self.inner.describe()
299    }
300
301    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
302        self.inner.append(&self.scoped_topic(topic)?, event).await
303    }
304
305    async fn flush(&self) -> Result<(), LogError> {
306        self.inner.flush().await
307    }
308
309    async fn read_range(
310        &self,
311        topic: &Topic,
312        from: Option<EventId>,
313        limit: usize,
314    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
315        self.inner
316            .read_range(&self.scoped_topic(topic)?, from, limit)
317            .await
318    }
319
320    async fn subscribe(
321        self: Arc<Self>,
322        topic: &Topic,
323        from: Option<EventId>,
324    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
325        self.inner
326            .clone()
327            .subscribe(&self.scoped_topic(topic)?, from)
328            .await
329    }
330
331    async fn ack(
332        &self,
333        topic: &Topic,
334        consumer: &ConsumerId,
335        up_to: EventId,
336    ) -> Result<(), LogError> {
337        self.inner
338            .ack(&self.scoped_topic(topic)?, consumer, up_to)
339            .await
340    }
341
342    async fn consumer_cursor(
343        &self,
344        topic: &Topic,
345        consumer: &ConsumerId,
346    ) -> Result<Option<EventId>, LogError> {
347        self.inner
348            .consumer_cursor(&self.scoped_topic(topic)?, consumer)
349            .await
350    }
351
352    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
353        self.inner.latest(&self.scoped_topic(topic)?).await
354    }
355
356    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
357        self.inner.compact(&self.scoped_topic(topic)?, before).await
358    }
359}
360
361pub struct TenantSecretProvider {
362    inner: Arc<dyn SecretProvider>,
363    scope: TenantScope,
364}
365
366impl TenantSecretProvider {
367    pub fn new(inner: Arc<dyn SecretProvider>, scope: TenantScope) -> Self {
368        Self { inner, scope }
369    }
370
371    fn scoped_id(&self, id: &SecretId) -> Result<SecretId, SecretError> {
372        if id.namespace == self.scope.secret_namespace {
373            return Ok(id.clone());
374        }
375        if id.namespace.starts_with(TENANT_SECRET_NAMESPACE_PREFIX) {
376            return Err(SecretError::NotFound {
377                provider: self.namespace().to_string(),
378                id: id.clone(),
379            });
380        }
381        Ok(SecretId {
382            namespace: self.scope.secret_namespace.clone(),
383            name: id.name.clone(),
384            version: id.version.clone(),
385        })
386    }
387}
388
389#[async_trait]
390impl SecretProvider for TenantSecretProvider {
391    async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
392        self.inner.get(&self.scoped_id(id)?).await
393    }
394
395    async fn put(&self, id: &SecretId, value: SecretBytes) -> Result<(), SecretError> {
396        self.inner.put(&self.scoped_id(id)?, value).await
397    }
398
399    async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
400        self.inner.rotate(&self.scoped_id(id)?).await
401    }
402
403    async fn list(&self, prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
404        self.inner.list(&self.scoped_id(prefix)?).await
405    }
406
407    fn namespace(&self) -> &str {
408        &self.scope.secret_namespace
409    }
410
411    fn supports_versions(&self) -> bool {
412        self.inner.supports_versions()
413    }
414}
415
416pub fn tenant_event_topic_prefix(id: &TenantId) -> String {
417    format!("{TENANT_EVENT_TOPIC_PREFIX}{}.", id.0)
418}
419
420pub fn tenant_secret_namespace(id: &TenantId) -> String {
421    format!("{TENANT_SECRET_NAMESPACE_PREFIX}{}", id.0)
422}
423
424pub fn tenant_topic(id: &TenantId, topic: &Topic) -> Result<Topic, LogError> {
425    validate_tenant_id(&id.0).map_err(LogError::InvalidTopic)?;
426    let prefix = tenant_event_topic_prefix(id);
427    if topic.as_str().starts_with(&prefix) {
428        return Ok(topic.clone());
429    }
430    if topic.as_str().starts_with(TENANT_EVENT_TOPIC_PREFIX) {
431        return Err(LogError::InvalidTopic(format!(
432            "topic '{}' is outside tenant scope '{}'",
433            topic.as_str(),
434            id.0
435        )));
436    }
437    Topic::new(format!("{prefix}{}", topic.as_str()))
438}
439
440pub fn validate_tenant_id(id: &str) -> Result<(), String> {
441    if id.trim().is_empty() {
442        return Err("tenant id cannot be empty".to_string());
443    }
444    if !id
445        .chars()
446        .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '_' | '-'))
447    {
448        return Err(format!(
449            "tenant id '{id}' contains unsupported characters; use ASCII letters, numbers, '_' or '-'"
450        ));
451    }
452    Ok(())
453}
454
455fn registry_path(state_dir: &Path) -> PathBuf {
456    state_dir
457        .join(TENANT_REGISTRY_DIR)
458        .join(TENANT_REGISTRY_FILE)
459}
460
461fn generate_api_key(id: &str) -> String {
462    let random: [u8; 32] = rand::random();
463    format!("harn_tenant_{id}_{}", hex::encode(random))
464}
465
466fn api_key_hash(value: &str) -> String {
467    hex::encode(Sha256::digest(value.as_bytes()))
468}
469
470fn api_key_prefix(value: &str) -> String {
471    value.chars().take(18).collect()
472}
473
474fn now_rfc3339() -> String {
475    OffsetDateTime::now_utc()
476        .format(&time::format_description::well_known::Rfc3339)
477        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
478}
479
480#[cfg(test)]
481mod tests {
482    use std::collections::BTreeMap;
483    use std::sync::Mutex;
484
485    use async_trait::async_trait;
486
487    use super::*;
488    use crate::event_log::{EventLog, MemoryEventLog};
489
490    #[tokio::test]
491    async fn tenant_event_log_enforces_topic_prefix() {
492        let inner = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
493        let scope =
494            TenantScope::new(TenantId::new("tenant-a"), std::env::temp_dir()).expect("scope");
495        let tenant_log = Arc::new(TenantEventLog::new(inner.clone(), scope));
496        let base = Topic::new("trigger.outbox").unwrap();
497
498        tenant_log
499            .append(&base, LogEvent::new("ok", serde_json::json!({"n": 1})))
500            .await
501            .unwrap();
502
503        let scoped = Topic::new("tenant.tenant-a.trigger.outbox").unwrap();
504        assert_eq!(inner.read_range(&scoped, None, 10).await.unwrap().len(), 1);
505        let other = Topic::new("tenant.tenant-b.trigger.outbox").unwrap();
506        assert!(tenant_log
507            .append(&other, LogEvent::new("bad", serde_json::json!({})))
508            .await
509            .is_err());
510    }
511
512    struct MemorySecretProvider {
513        namespace: String,
514        values: Mutex<BTreeMap<SecretId, SecretBytes>>,
515    }
516
517    #[async_trait]
518    impl SecretProvider for MemorySecretProvider {
519        async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
520            self.values
521                .lock()
522                .expect("secret map")
523                .get(id)
524                .map(SecretBytes::reborrow)
525                .ok_or_else(|| SecretError::NotFound {
526                    provider: self.namespace.clone(),
527                    id: id.clone(),
528                })
529        }
530
531        async fn put(&self, id: &SecretId, value: SecretBytes) -> Result<(), SecretError> {
532            self.values
533                .lock()
534                .expect("secret map")
535                .insert(id.clone(), value);
536            Ok(())
537        }
538
539        async fn rotate(&self, _id: &SecretId) -> Result<RotationHandle, SecretError> {
540            Err(SecretError::Unsupported {
541                provider: self.namespace.clone(),
542                operation: "rotate",
543            })
544        }
545
546        async fn list(&self, _prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
547            Ok(Vec::new())
548        }
549
550        fn namespace(&self) -> &str {
551            &self.namespace
552        }
553
554        fn supports_versions(&self) -> bool {
555            false
556        }
557    }
558
559    #[tokio::test]
560    async fn tenant_secret_provider_rescopes_and_denies_cross_tenant_ids() {
561        let inner = Arc::new(MemorySecretProvider {
562            namespace: "global".to_string(),
563            values: Mutex::new(BTreeMap::new()),
564        });
565        let scope =
566            TenantScope::new(TenantId::new("tenant-a"), std::env::temp_dir()).expect("scope");
567        let provider = TenantSecretProvider::new(inner.clone(), scope.clone());
568
569        provider
570            .put(
571                &SecretId::new("github", "webhook"),
572                SecretBytes::from("a-secret"),
573            )
574            .await
575            .unwrap();
576
577        let scoped_id = SecretId::new(scope.secret_namespace, "webhook");
578        let value = inner.get(&scoped_id).await.unwrap();
579        value.with_exposed(|bytes| assert_eq!(bytes, b"a-secret"));
580
581        let cross = SecretId::new("harn.tenant.tenant-b", "webhook");
582        assert!(provider.get(&cross).await.is_err());
583    }
584}