1use std::collections::BTreeMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use futures::stream::BoxStream;
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9use subtle::ConstantTimeEq;
10use time::OffsetDateTime;
11
12use crate::event_log::{
13 AnyEventLog, CompactReport, ConsumerId, EventId, EventLog, EventLogDescription, LogError,
14 LogEvent, Topic,
15};
16use crate::orchestration::CapabilityPolicy;
17use crate::secrets::{
18 RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider,
19};
20use crate::TenantId;
21
22pub const TENANT_REGISTRY_DIR: &str = "tenants";
23pub const TENANT_REGISTRY_FILE: &str = "registry.json";
24pub const TENANT_SECRET_NAMESPACE_PREFIX: &str = "harn.tenant.";
25pub const TENANT_EVENT_TOPIC_PREFIX: &str = "tenant.";
26
27#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
28#[serde(transparent)]
29pub struct ApiKeyId(pub String);
30
31#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
32#[serde(default)]
33pub struct TenantBudget {
34 pub daily_cost_usd: Option<f64>,
35 pub hourly_cost_usd: Option<f64>,
36 pub ingest_per_minute: Option<u32>,
37 pub event_log_size_bytes: u64,
38 pub in_flight_dispatches: u32,
39 pub dlq_entries: u32,
40}
41
42impl Default for TenantBudget {
43 fn default() -> Self {
44 Self {
45 daily_cost_usd: None,
46 hourly_cost_usd: None,
47 ingest_per_minute: None,
48 event_log_size_bytes: 10 * 1024 * 1024 * 1024,
49 in_flight_dispatches: 100,
50 dlq_entries: 10_000,
51 }
52 }
53}
54
55#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
56pub struct TenantScope {
57 pub id: TenantId,
58 pub state_root: PathBuf,
59 pub secret_namespace: String,
60 pub event_log_topic_prefix: String,
61 pub capability_ceiling: CapabilityPolicy,
62 pub budget: TenantBudget,
63 pub api_key_ids: Vec<ApiKeyId>,
64}
65
66impl TenantScope {
67 pub fn new(id: TenantId, orchestrator_state_root: impl AsRef<Path>) -> Result<Self, String> {
68 validate_tenant_id(&id.0)?;
69 let state_root = orchestrator_state_root
70 .as_ref()
71 .join(TENANT_REGISTRY_DIR)
72 .join(&id.0);
73 Ok(Self {
74 secret_namespace: tenant_secret_namespace(&id),
75 event_log_topic_prefix: tenant_event_topic_prefix(&id),
76 id,
77 state_root,
78 capability_ceiling: CapabilityPolicy::default(),
79 budget: TenantBudget::default(),
80 api_key_ids: Vec::new(),
81 })
82 }
83
84 pub fn topic(&self, topic: &Topic) -> Result<Topic, LogError> {
85 tenant_topic(&self.id, topic)
86 }
87}
88
89#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
90#[serde(rename_all = "snake_case")]
91pub enum TenantStatus {
92 Active,
93 Suspended,
94}
95
96#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
97pub struct TenantApiKeyRecord {
98 pub id: ApiKeyId,
99 pub hash_sha256: String,
100 pub prefix: String,
101 pub created_at: String,
102}
103
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
105pub struct TenantRecord {
106 pub scope: TenantScope,
107 pub status: TenantStatus,
108 pub created_at: String,
109 pub suspended_at: Option<String>,
110 pub api_keys: Vec<TenantApiKeyRecord>,
111}
112
113#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
114#[serde(default)]
115pub struct TenantRegistrySnapshot {
116 pub tenants: Vec<TenantRecord>,
117}
118
119#[derive(Clone, Debug)]
120pub struct TenantStore {
121 state_dir: PathBuf,
122 tenants: BTreeMap<String, TenantRecord>,
123}
124
125#[derive(Clone, Debug, PartialEq, Eq)]
126pub enum TenantResolutionError {
127 Unknown,
128 Suspended(TenantId),
129}
130
131impl TenantStore {
132 pub fn load(state_dir: impl AsRef<Path>) -> Result<Self, String> {
133 let state_dir = state_dir.as_ref().to_path_buf();
134 let path = registry_path(&state_dir);
135 if !path.is_file() {
136 return Ok(Self {
137 state_dir,
138 tenants: BTreeMap::new(),
139 });
140 }
141 let content = std::fs::read_to_string(&path)
142 .map_err(|error| format!("failed to read {}: {error}", path.display()))?;
143 let snapshot: TenantRegistrySnapshot = serde_json::from_str(&content)
144 .map_err(|error| format!("failed to parse {}: {error}", path.display()))?;
145 let tenants = snapshot
146 .tenants
147 .into_iter()
148 .map(|record| (record.scope.id.0.clone(), record))
149 .collect();
150 Ok(Self { state_dir, tenants })
151 }
152
153 pub fn save(&self) -> Result<(), String> {
154 let dir = self.state_dir.join(TENANT_REGISTRY_DIR);
155 std::fs::create_dir_all(&dir)
156 .map_err(|error| format!("failed to create {}: {error}", dir.display()))?;
157 let snapshot = TenantRegistrySnapshot {
158 tenants: self.list().to_vec(),
159 };
160 let encoded = serde_json::to_string_pretty(&snapshot).map_err(|error| error.to_string())?;
161 let path = registry_path(&self.state_dir);
162 write_file_replace(&path, encoded.as_bytes())
163 .map_err(|error| format!("failed to write {}: {error}", path.display()))
164 }
165
166 pub fn create_tenant(
167 &mut self,
168 id: impl Into<String>,
169 budget: TenantBudget,
170 ) -> Result<(TenantRecord, String), String> {
171 let id = id.into();
172 validate_tenant_id(&id)?;
173 if self.tenants.contains_key(&id) {
174 return Err(format!("tenant '{id}' already exists"));
175 }
176 let api_key = generate_api_key(&id);
177 let api_key_id = ApiKeyId(format!("key_{}", uuid::Uuid::now_v7()));
178 let created_at = now_rfc3339();
179 let mut scope = TenantScope::new(TenantId::new(id.clone()), &self.state_dir)?;
180 scope.budget = budget;
181 scope.api_key_ids.push(api_key_id.clone());
182 std::fs::create_dir_all(&scope.state_root).map_err(|error| {
183 format!(
184 "failed to create tenant state dir {}: {error}",
185 scope.state_root.display()
186 )
187 })?;
188 let record = TenantRecord {
189 scope,
190 status: TenantStatus::Active,
191 created_at: created_at.clone(),
192 suspended_at: None,
193 api_keys: vec![TenantApiKeyRecord {
194 id: api_key_id,
195 hash_sha256: api_key_hash(&api_key),
196 prefix: api_key_prefix(&api_key),
197 created_at,
198 }],
199 };
200 self.tenants.insert(id, record.clone());
201 self.save()?;
202 Ok((record, api_key))
203 }
204
205 pub fn list(&self) -> Vec<TenantRecord> {
206 self.tenants.values().cloned().collect()
207 }
208
209 pub fn get(&self, id: &str) -> Option<&TenantRecord> {
210 self.tenants.get(id)
211 }
212
213 pub fn suspend(&mut self, id: &str) -> Result<TenantRecord, String> {
214 let record = self
215 .tenants
216 .get_mut(id)
217 .ok_or_else(|| format!("unknown tenant '{id}'"))?;
218 record.status = TenantStatus::Suspended;
219 record.suspended_at = Some(now_rfc3339());
220 let record = record.clone();
221 self.save()?;
222 Ok(record)
223 }
224
225 pub fn delete(&mut self, id: &str) -> Result<TenantRecord, String> {
226 let record = self
227 .tenants
228 .remove(id)
229 .ok_or_else(|| format!("unknown tenant '{id}'"))?;
230 if record.scope.state_root.exists() {
231 std::fs::remove_dir_all(&record.scope.state_root).map_err(|error| {
232 format!(
233 "failed to remove tenant state dir {}: {error}",
234 record.scope.state_root.display()
235 )
236 })?;
237 }
238 self.save()?;
239 Ok(record)
240 }
241
242 pub fn resolve_api_key(&self, candidate: &str) -> Result<TenantScope, TenantResolutionError> {
243 let candidate_hash = api_key_hash(candidate);
244 for record in self.tenants.values() {
245 let matched = record.api_keys.iter().any(|key| {
246 key.hash_sha256
247 .as_bytes()
248 .ct_eq(candidate_hash.as_bytes())
249 .into()
250 });
251 if matched {
252 return match record.status {
253 TenantStatus::Active => Ok(record.scope.clone()),
254 TenantStatus::Suspended => {
255 Err(TenantResolutionError::Suspended(record.scope.id.clone()))
256 }
257 };
258 }
259 }
260 Err(TenantResolutionError::Unknown)
261 }
262}
263
264pub struct TenantEventLog {
265 inner: Arc<AnyEventLog>,
266 scope: TenantScope,
267}
268
269impl TenantEventLog {
270 pub fn new(inner: Arc<AnyEventLog>, scope: TenantScope) -> Self {
271 Self { inner, scope }
272 }
273
274 pub fn scope(&self) -> &TenantScope {
275 &self.scope
276 }
277
278 fn scoped_topic(&self, topic: &Topic) -> Result<Topic, LogError> {
279 if topic.as_str().starts_with(TENANT_EVENT_TOPIC_PREFIX) {
280 if topic
281 .as_str()
282 .starts_with(&self.scope.event_log_topic_prefix)
283 {
284 return Ok(topic.clone());
285 }
286 return Err(LogError::InvalidTopic(format!(
287 "topic '{}' is outside tenant scope '{}'",
288 topic.as_str(),
289 self.scope.id.0
290 )));
291 }
292 self.scope.topic(topic)
293 }
294}
295
296impl EventLog for TenantEventLog {
297 fn describe(&self) -> EventLogDescription {
298 self.inner.describe()
299 }
300
301 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
302 self.inner.append(&self.scoped_topic(topic)?, event).await
303 }
304
305 async fn flush(&self) -> Result<(), LogError> {
306 self.inner.flush().await
307 }
308
309 async fn read_range(
310 &self,
311 topic: &Topic,
312 from: Option<EventId>,
313 limit: usize,
314 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
315 self.inner
316 .read_range(&self.scoped_topic(topic)?, from, limit)
317 .await
318 }
319
320 async fn subscribe(
321 self: Arc<Self>,
322 topic: &Topic,
323 from: Option<EventId>,
324 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
325 self.inner
326 .clone()
327 .subscribe(&self.scoped_topic(topic)?, from)
328 .await
329 }
330
331 async fn ack(
332 &self,
333 topic: &Topic,
334 consumer: &ConsumerId,
335 up_to: EventId,
336 ) -> Result<(), LogError> {
337 self.inner
338 .ack(&self.scoped_topic(topic)?, consumer, up_to)
339 .await
340 }
341
342 async fn consumer_cursor(
343 &self,
344 topic: &Topic,
345 consumer: &ConsumerId,
346 ) -> Result<Option<EventId>, LogError> {
347 self.inner
348 .consumer_cursor(&self.scoped_topic(topic)?, consumer)
349 .await
350 }
351
352 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
353 self.inner.latest(&self.scoped_topic(topic)?).await
354 }
355
356 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
357 self.inner.compact(&self.scoped_topic(topic)?, before).await
358 }
359}
360
361pub struct TenantSecretProvider {
362 inner: Arc<dyn SecretProvider>,
363 scope: TenantScope,
364}
365
366impl TenantSecretProvider {
367 pub fn new(inner: Arc<dyn SecretProvider>, scope: TenantScope) -> Self {
368 Self { inner, scope }
369 }
370
371 fn scoped_id(&self, id: &SecretId) -> Result<SecretId, SecretError> {
372 if id.namespace == self.scope.secret_namespace {
373 return Ok(id.clone());
374 }
375 if id.namespace.starts_with(TENANT_SECRET_NAMESPACE_PREFIX) {
376 return Err(SecretError::NotFound {
377 provider: self.namespace().to_string(),
378 id: id.clone(),
379 });
380 }
381 Ok(SecretId {
382 namespace: self.scope.secret_namespace.clone(),
383 name: id.name.clone(),
384 version: id.version.clone(),
385 })
386 }
387}
388
389#[async_trait]
390impl SecretProvider for TenantSecretProvider {
391 async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
392 self.inner.get(&self.scoped_id(id)?).await
393 }
394
395 async fn put(&self, id: &SecretId, value: SecretBytes) -> Result<(), SecretError> {
396 self.inner.put(&self.scoped_id(id)?, value).await
397 }
398
399 async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
400 self.inner.rotate(&self.scoped_id(id)?).await
401 }
402
403 async fn list(&self, prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
404 self.inner.list(&self.scoped_id(prefix)?).await
405 }
406
407 fn namespace(&self) -> &str {
408 &self.scope.secret_namespace
409 }
410
411 fn supports_versions(&self) -> bool {
412 self.inner.supports_versions()
413 }
414}
415
416pub fn tenant_event_topic_prefix(id: &TenantId) -> String {
417 format!("{TENANT_EVENT_TOPIC_PREFIX}{}.", id.0)
418}
419
420pub fn tenant_secret_namespace(id: &TenantId) -> String {
421 format!("{TENANT_SECRET_NAMESPACE_PREFIX}{}", id.0)
422}
423
424pub fn tenant_topic(id: &TenantId, topic: &Topic) -> Result<Topic, LogError> {
425 validate_tenant_id(&id.0).map_err(LogError::InvalidTopic)?;
426 let prefix = tenant_event_topic_prefix(id);
427 if topic.as_str().starts_with(&prefix) {
428 return Ok(topic.clone());
429 }
430 if topic.as_str().starts_with(TENANT_EVENT_TOPIC_PREFIX) {
431 return Err(LogError::InvalidTopic(format!(
432 "topic '{}' is outside tenant scope '{}'",
433 topic.as_str(),
434 id.0
435 )));
436 }
437 Topic::new(format!("{prefix}{}", topic.as_str()))
438}
439
440pub fn validate_tenant_id(id: &str) -> Result<(), String> {
441 if id.trim().is_empty() {
442 return Err("tenant id cannot be empty".to_string());
443 }
444 if !id
445 .chars()
446 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '_' | '-'))
447 {
448 return Err(format!(
449 "tenant id '{id}' contains unsupported characters; use ASCII letters, numbers, '_' or '-'"
450 ));
451 }
452 Ok(())
453}
454
455fn registry_path(state_dir: &Path) -> PathBuf {
456 state_dir
457 .join(TENANT_REGISTRY_DIR)
458 .join(TENANT_REGISTRY_FILE)
459}
460
461fn write_file_replace(path: &Path, contents: &[u8]) -> std::io::Result<()> {
462 let dir = path.parent().unwrap_or_else(|| Path::new("."));
463 let tmp_path = dir.join(format!(
464 ".{}.{}.tmp",
465 path.file_name()
466 .and_then(|name| name.to_str())
467 .unwrap_or("registry"),
468 uuid::Uuid::now_v7()
469 ));
470 std::fs::write(&tmp_path, contents)?;
471 #[cfg(windows)]
472 if path.exists() {
473 std::fs::remove_file(path)?;
474 }
475 std::fs::rename(&tmp_path, path).inspect_err(|_| {
476 let _ = std::fs::remove_file(&tmp_path);
477 })?;
478 Ok(())
479}
480
481fn generate_api_key(id: &str) -> String {
482 let random: [u8; 32] = rand::random();
483 format!("harn_tenant_{id}_{}", hex::encode(random))
484}
485
486fn api_key_hash(value: &str) -> String {
487 hex::encode(Sha256::digest(value.as_bytes()))
488}
489
490fn api_key_prefix(value: &str) -> String {
491 value.chars().take(18).collect()
492}
493
494fn now_rfc3339() -> String {
495 OffsetDateTime::now_utc()
496 .format(&time::format_description::well_known::Rfc3339)
497 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
498}
499
500#[cfg(test)]
501mod tests {
502 use std::collections::BTreeMap;
503 use std::sync::Mutex;
504
505 use async_trait::async_trait;
506
507 use super::*;
508 use crate::event_log::{EventLog, MemoryEventLog};
509
510 #[tokio::test]
511 async fn tenant_event_log_enforces_topic_prefix() {
512 let inner = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
513 let scope =
514 TenantScope::new(TenantId::new("tenant-a"), std::env::temp_dir()).expect("scope");
515 let tenant_log = Arc::new(TenantEventLog::new(inner.clone(), scope));
516 let base = Topic::new("trigger.outbox").unwrap();
517
518 tenant_log
519 .append(&base, LogEvent::new("ok", serde_json::json!({"n": 1})))
520 .await
521 .unwrap();
522
523 let scoped = Topic::new("tenant.tenant-a.trigger.outbox").unwrap();
524 assert_eq!(inner.read_range(&scoped, None, 10).await.unwrap().len(), 1);
525 let other = Topic::new("tenant.tenant-b.trigger.outbox").unwrap();
526 assert!(tenant_log
527 .append(&other, LogEvent::new("bad", serde_json::json!({})))
528 .await
529 .is_err());
530 }
531
532 struct MemorySecretProvider {
533 namespace: String,
534 values: Mutex<BTreeMap<SecretId, SecretBytes>>,
535 }
536
537 #[async_trait]
538 impl SecretProvider for MemorySecretProvider {
539 async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
540 self.values
541 .lock()
542 .expect("secret map")
543 .get(id)
544 .map(SecretBytes::reborrow)
545 .ok_or_else(|| SecretError::NotFound {
546 provider: self.namespace.clone(),
547 id: id.clone(),
548 })
549 }
550
551 async fn put(&self, id: &SecretId, value: SecretBytes) -> Result<(), SecretError> {
552 self.values
553 .lock()
554 .expect("secret map")
555 .insert(id.clone(), value);
556 Ok(())
557 }
558
559 async fn rotate(&self, _id: &SecretId) -> Result<RotationHandle, SecretError> {
560 Err(SecretError::Unsupported {
561 provider: self.namespace.clone(),
562 operation: "rotate",
563 })
564 }
565
566 async fn list(&self, _prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
567 Ok(Vec::new())
568 }
569
570 fn namespace(&self) -> &str {
571 &self.namespace
572 }
573
574 fn supports_versions(&self) -> bool {
575 false
576 }
577 }
578
579 #[tokio::test]
580 async fn tenant_secret_provider_rescopes_and_denies_cross_tenant_ids() {
581 let inner = Arc::new(MemorySecretProvider {
582 namespace: "global".to_string(),
583 values: Mutex::new(BTreeMap::new()),
584 });
585 let scope =
586 TenantScope::new(TenantId::new("tenant-a"), std::env::temp_dir()).expect("scope");
587 let provider = TenantSecretProvider::new(inner.clone(), scope.clone());
588
589 provider
590 .put(
591 &SecretId::new("github", "webhook"),
592 SecretBytes::from("a-secret"),
593 )
594 .await
595 .unwrap();
596
597 let scoped_id = SecretId::new(scope.secret_namespace, "webhook");
598 let value = inner.get(&scoped_id).await.unwrap();
599 value.with_exposed(|bytes| assert_eq!(bytes, b"a-secret"));
600
601 let cross = SecretId::new("harn.tenant.tenant-b", "webhook");
602 assert!(provider.get(&cross).await.is_err());
603 }
604
605 #[test]
606 fn tenant_store_save_replaces_registry_without_temp_leak() {
607 let temp = tempfile::tempdir().unwrap();
608 let mut store = TenantStore::load(temp.path()).unwrap();
609 store
610 .create_tenant("tenant-a", TenantBudget::default())
611 .unwrap();
612
613 let registry = registry_path(temp.path());
614 assert!(registry.is_file());
615 let leaked_temp = std::fs::read_dir(registry.parent().unwrap())
616 .unwrap()
617 .filter_map(Result::ok)
618 .any(|entry| entry.file_name().to_string_lossy().ends_with(".tmp"));
619 assert!(!leaked_temp);
620 }
621}