1use crate::local_cache::{build_account_identifiers_locally, build_region_identifiers_locally};
2use crate::proxy::{get_proxies, get_proxy_by_id};
3use crate::redis_cache::RedisCache;
4use crate::{
5 get_account_by_id, get_accounts, get_asset, get_assets, get_ddi, get_ddis, get_device,
6 get_devices, get_hook, get_hooks, get_region_by_id, get_regions, get_trunk, get_trunk_and_ddi,
7 get_trunks,
8};
9use cal_core::device::device::DeviceStruct;
10use cal_core::{AccountLite, Asset, FlowState, Hook, Proxy, RedisEvent, Region, Trunk, DDI};
11use moka::sync::Cache;
12use redis::aio::MultiplexedConnection;
13use redis::{AsyncCommands, PushKind, RedisError, Value};
14use std::collections::HashMap;
15use std::env;
16use std::time::Duration;
17use tokio::sync::mpsc::unbounded_channel;
18
19#[derive(Clone)]
24pub struct CallableCache {
25 pub local_cache: LocalCache,
26 pub remote_cache: RedisCache,
27}
28
29#[derive(Clone)]
33pub struct LocalCache {
34 pub flow_state: Cache<String, FlowState>,
36 pub regions: Cache<String, Region>,
38 pub region_idents: Cache<String, String>,
40 pub accounts: Cache<String, AccountLite>,
42 pub account_idents: Cache<String, String>,
44 pub devices: Cache<String, DeviceStruct>,
46 pub trunks: Cache<String, Trunk>,
48 pub ddis: Cache<String, DDI>,
50 pub hooks: Cache<String, Hook>,
52 pub assets: Cache<String, Asset>,
54 pub trunk_ddi_idents: Cache<String, String>,
56 pub proxies: Cache<String, Proxy>,
57}
58
59impl CallableCache {
60 pub async fn new() -> CallableCache {
65 let (remote_cache, local_cache) = create_pool().await;
66 CallableCache {
67 local_cache,
68 remote_cache,
69 }
70 }
71
72 pub async fn get_region_by_id(self, id: &str) -> Result<Option<Region>, RedisError> {
80 if let Some(resolved_id) = self.local_cache.region_idents.get(id) {
82 if let Some(region) = self.local_cache.regions.get(resolved_id.as_str()) {
84 return Ok(Some(region));
85 }
86
87 match get_region_by_id(self.remote_cache.connection.clone(), &resolved_id).await? {
89 Some(region) => {
90 build_region_identifiers_locally(self.local_cache, ®ion.clone());
92 Ok(Some(region))
93 }
94 None => Ok(None),
95 }
96 } else {
97 match get_region_by_id(self.remote_cache.connection.clone(), id).await? {
99 Some(region) => {
100 build_region_identifiers_locally(self.local_cache, ®ion.clone());
102 Ok(Some(region))
103 }
104 None => Ok(None),
105 }
106 }
107 }
108
109 pub async fn get_account_by_trunk_ddi(
124 self,
125 ddi_id: &str,
126 trunk_ip: &str,
127 ) -> Result<Option<AccountLite>, RedisError> {
128 let trunk_ddi_key = format!("{}:{}", trunk_ip, ddi_id);
130
131 if let Some(account_id) = self.local_cache.trunk_ddi_idents.get(&trunk_ddi_key) {
133 if let Some(account) = self.local_cache.accounts.get(account_id.as_str()) {
135 return Ok(Some(account.clone()));
136 }
137
138 match get_account_by_id(self.remote_cache.connection.clone(), account_id.as_str())
140 .await?
141 {
142 Some(account) => {
143 build_account_identifiers_locally(self.local_cache, &account.clone());
145 Ok(Some(account))
146 }
147 None => Ok(None),
148 }
149 } else {
150 match get_trunk_and_ddi(self.remote_cache.connection.clone(), ddi_id, trunk_ip).await? {
152 Some(account) => {
153 build_account_identifiers_locally(self.local_cache.clone(), &account.clone());
155
156 self.local_cache
158 .trunk_ddi_idents
159 .insert(trunk_ddi_key, account.id.clone());
160
161 Ok(Some(account))
162 }
163 None => Ok(None),
164 }
165 }
166 }
167
168 pub async fn get_account_by_id(self, id: &str) -> Result<Option<AccountLite>, RedisError> {
176 if let Some(resolved_id) = self.local_cache.account_idents.get(id) {
178 if let Some(account) = self.local_cache.accounts.get(resolved_id.as_str()) {
180 return Ok(Some(account));
181 }
182
183 match get_account_by_id(self.remote_cache.connection.clone(), &resolved_id).await? {
185 Some(account) => {
186 build_account_identifiers_locally(self.local_cache, &account.clone());
188 Ok(Some(account))
189 }
190 None => Ok(None),
191 }
192 } else {
193 match get_account_by_id(self.remote_cache.connection.clone(), id).await? {
195 Some(account) => {
196 build_account_identifiers_locally(self.local_cache, &account.clone());
198 Ok(Some(account))
199 }
200 None => Ok(None),
201 }
202 }
203 }
204
205 pub async fn get_regions(self) -> Result<Vec<Region>, RedisError> {
210 let regions = get_regions(self.remote_cache.connection).await?;
211
212 for region in ®ions {
213 build_region_identifiers_locally(self.local_cache.clone(), region);
214 }
215
216 Ok(regions)
217 }
218
219 pub async fn get_accounts(self) -> Result<Vec<AccountLite>, RedisError> {
224 let accounts = get_accounts(self.remote_cache.connection).await?;
225
226 for account in &accounts {
227 build_account_identifiers_locally(self.local_cache.clone(), account);
228 }
229
230 Ok(accounts)
231 }
232
233 pub async fn get_str(self, key: &str) -> Result<Option<String>, RedisError> {
241 get_str(self.remote_cache.connection, key).await
242 }
243
244 pub async fn get_hash(self, key: &str, field: &str) -> Result<Option<String>, RedisError> {
253 get_hash(self.remote_cache.connection, key, field).await
254 }
255
256 pub async fn get_devices(self, account_id: &str) -> Result<Vec<DeviceStruct>, RedisError> {
264 let devices = get_devices(self.remote_cache.connection.clone(), account_id).await?;
266
267 for device in &devices {
269 let cache_key = format!("{}:{}", account_id, device.id);
270 self.local_cache.devices.insert(cache_key, device.clone());
271 }
272
273 Ok(devices)
274 }
275
276 pub async fn get_device(
285 self,
286 account_id: &str,
287 device_id: &str,
288 ) -> Result<Option<DeviceStruct>, RedisError> {
289 let cache_key = format!("{}:{}", account_id, device_id);
291 if let Some(device) = self.local_cache.devices.get(&cache_key) {
292 return Ok(Some(device));
293 }
294
295 match get_device(self.remote_cache.connection.clone(), account_id, device_id).await? {
297 Some(device) => {
298 self.local_cache.devices.insert(cache_key, device.clone());
300 Ok(Some(device))
301 }
302 None => Ok(None),
303 }
304 }
305
306 pub async fn get_trunks(self, account_id: &str) -> Result<Vec<Trunk>, RedisError> {
314 let trunks = get_trunks(self.remote_cache.connection.clone(), account_id).await?;
316
317 for trunk in &trunks {
319 let cache_key = format!("{}:{}", account_id, trunk.id);
320 self.local_cache.trunks.insert(cache_key, trunk.clone());
321 }
322
323 Ok(trunks)
324 }
325
326 pub async fn get_trunk(
335 self,
336 account_id: &str,
337 trunk_id: &str,
338 ) -> Result<Option<Trunk>, RedisError> {
339 let cache_key = format!("{}:{}", account_id, trunk_id);
341 if let Some(trunk) = self.local_cache.trunks.get(&cache_key) {
342 return Ok(Some(trunk));
343 }
344
345 match get_trunk(self.remote_cache.connection.clone(), account_id, trunk_id).await? {
347 Some(trunk) => {
348 self.local_cache.trunks.insert(cache_key, trunk.clone());
350 Ok(Some(trunk))
351 }
352 None => Ok(None),
353 }
354 }
355
356 pub async fn get_ddis(self, account_id: &str) -> Result<Vec<DDI>, RedisError> {
364 let ddis = get_ddis(self.remote_cache.connection.clone(), account_id).await?;
366
367 for ddi in &ddis {
369 let cache_key = format!("{}:{}", account_id, ddi.id);
370 self.local_cache.ddis.insert(cache_key, ddi.clone());
371 }
372
373 Ok(ddis)
374 }
375
376 pub async fn get_ddi(self, account_id: &str, ddi_id: &str) -> Result<Option<DDI>, RedisError> {
385 let cache_key = format!("{}:{}", account_id, ddi_id);
387 if let Some(ddi) = self.local_cache.ddis.get(&cache_key) {
388 return Ok(Some(ddi));
389 }
390
391 match get_ddi(self.remote_cache.connection.clone(), account_id, ddi_id).await? {
393 Some(ddi) => {
394 self.local_cache.ddis.insert(cache_key, ddi.clone());
396 Ok(Some(ddi))
397 }
398 None => Ok(None),
399 }
400 }
401
402 pub async fn get_hooks(self, account_id: &str) -> Result<Vec<Hook>, RedisError> {
410 let hooks = get_hooks(self.remote_cache.connection.clone(), account_id).await?;
412
413 for hook in &hooks {
415 let cache_key = format!("{}:{}", account_id, hook.id);
416 self.local_cache.hooks.insert(cache_key, hook.clone());
417 }
418
419 Ok(hooks)
420 }
421
422 pub async fn get_hook(
431 self,
432 account_id: &str,
433 hook_id: &str,
434 ) -> Result<Option<Hook>, RedisError> {
435 let cache_key = format!("{}:{}", account_id, hook_id);
437 if let Some(hook) = self.local_cache.hooks.get(&cache_key) {
438 return Ok(Some(hook));
439 }
440
441 match get_hook(self.remote_cache.connection.clone(), account_id, hook_id).await? {
443 Some(hook) => {
444 self.local_cache.hooks.insert(cache_key, hook.clone());
446 Ok(Some(hook))
447 }
448 None => Ok(None),
449 }
450 }
451
452 pub async fn get_assets(self, account_id: &str) -> Result<Vec<Asset>, RedisError> {
460 let assets = get_assets(self.remote_cache.connection.clone(), account_id).await?;
462
463 for asset in &assets {
465 let cache_key = format!("{}:{}", account_id, asset.id);
466 self.local_cache.assets.insert(cache_key, asset.clone());
467 }
468
469 Ok(assets)
470 }
471
472 pub async fn get_asset(
481 self,
482 account_id: &str,
483 asset_id: &str,
484 ) -> Result<Option<Asset>, RedisError> {
485 let cache_key = format!("{}:{}", account_id, asset_id);
487 if let Some(asset) = self.local_cache.assets.get(&cache_key) {
488 return Ok(Some(asset));
489 }
490
491 match get_asset(self.remote_cache.connection.clone(), account_id, asset_id).await? {
493 Some(asset) => {
494 self.local_cache.assets.insert(cache_key, asset.clone());
496 Ok(Some(asset))
497 }
498 None => Ok(None),
499 }
500 }
501
502 pub async fn get_proxy_by_id(&mut self, id: &str) -> Result<Option<Proxy>, RedisError> {
510 if let Some(proxy) = self.local_cache.proxies.get(id) {
511 Ok(Some(proxy))
512 } else {
513 match get_proxy_by_id(self.remote_cache.connection.clone(), id).await? {
515 Some(proxy) => {
516 self.local_cache
517 .proxies
518 .insert(id.to_string(), proxy.clone());
519 Ok(Some(proxy))
520 }
521 None => Ok(None),
522 }
523 }
524 }
525
526 pub async fn get_proxies(&mut self) -> Result<Vec<Proxy>, RedisError> {
527 let proxies = get_proxies(self.remote_cache.connection.clone()).await?;
528 self.local_cache.proxies.invalidate_all();
529 for proxy in &proxies {
530 self.local_cache.proxies.insert(proxy.id.clone(), proxy.clone());
531 }
532 Ok(proxies)
533 }
534}
535
536fn get_redis_host() -> String {
544 let name = "CAL_VALKEY_HOST";
545 env::var(name).unwrap_or_else(|_| panic!("${} is not set", name))
546}
547
548pub async fn create_pool() -> (RedisCache, LocalCache) {
556 let client = redis::Client::open(get_redis_host()).expect("Failed to connect to Redis");
558
559 let (tx, mut rx) = unbounded_channel();
560 let config = redis::AsyncConnectionConfig::new().set_push_sender(tx);
561 let mut con = client
562 .get_multiplexed_async_connection_with_config(&config)
563 .await
564 .expect("Failed to get Redis connection");
565
566 con.subscribe(&["cal:events"])
567 .await
568 .expect("Failed to subscribe to events");
569
570 fn create_cache<T: Clone + Send + Sync + 'static>(
572 capacity_mb: u64,
573 ttl_secs: u64,
574 ) -> Cache<String, T> {
575 Cache::builder()
576 .max_capacity(capacity_mb * 1024 * 1024)
577 .time_to_live(Duration::from_secs(ttl_secs))
578 .build()
579 }
580
581 let local_cache = LocalCache {
583 flow_state: create_cache::<FlowState>(50, 60 * 60 * 4),
584 regions: create_cache::<Region>(5, 60),
585 region_idents: create_cache::<String>(5, 60),
586 accounts: create_cache::<AccountLite>(50, 60),
587 account_idents: create_cache::<String>(5, 60),
588 devices: create_cache::<DeviceStruct>(20, 60),
589 trunks: create_cache::<Trunk>(10, 60),
590 ddis: create_cache::<DDI>(10, 60),
591 hooks: create_cache::<Hook>(10, 60),
592 assets: create_cache::<Asset>(10, 60),
593 trunk_ddi_idents: create_cache::<String>(10, 60),
594 proxies: create_cache::<Proxy>(10, 60 * 60),
595 };
596
597 let local_cache_cloned = local_cache.clone();
598
599 tokio::spawn(async move {
601 while let Some(push_info) = rx.recv().await {
602 if push_info.kind != PushKind::Message {
603 continue;
604 }
605
606 if let Some(Value::BulkString(bytes)) = push_info.data.get(1) {
607 if let Ok(event_str) = String::from_utf8(bytes.clone()) {
608 if let Ok(event) = serde_json::from_str::<RedisEvent>(&event_str) {
609 handle_redis_event(event, &local_cache_cloned);
610 }
611 }
612 }
613 }
614 });
615
616 (RedisCache { connection: con }, local_cache)
617}
618
619fn handle_redis_event(event: RedisEvent, cache: &LocalCache) {
625 match event {
626 RedisEvent::AccountUpdate(update) => {
627 build_account_identifiers_locally(cache.clone(), &update.payload.into());
628 }
629 RedisEvent::AccountDelete(delete) => {
630 cache.accounts.invalidate(&delete.id);
631 }
632 RedisEvent::AccountCreate(create) => {
633 build_account_identifiers_locally(cache.clone(), &create.payload.into());
634 }
635 RedisEvent::AccountSync => {
636 cache.accounts.invalidate_all();
637 cache.account_idents.invalidate_all();
638 }
639 RedisEvent::RegionUpdate(update) => {
640 cache
641 .regions
642 .insert(update.payload.id.to_string(), update.payload);
643 }
644 RedisEvent::RegionDelete(delete) => {
645 cache.regions.invalidate(&delete.id);
646 }
647 RedisEvent::RegionCreate(create) => {
648 build_region_identifiers_locally(cache.clone(), &create.payload);
649 }
650 RedisEvent::RegionSync => {
651 cache.regions.invalidate_all();
652 cache.region_idents.invalidate_all();
653 }
654 }
655}
656
657pub async fn get_str(
666 mut con: MultiplexedConnection,
667 key: &str,
668) -> Result<Option<String>, RedisError> {
669 con.get(key).await
670}
671
672pub async fn get_hash(
682 mut con: MultiplexedConnection,
683 key: &str,
684 field: &str,
685) -> Result<Option<String>, RedisError> {
686 con.hget(key, field).await
687}
688
689pub async fn get_hash_all_values(
698 mut con: MultiplexedConnection,
699 key: &str,
700) -> Result<HashMap<String, String>, RedisError> {
701 con.hvals(key).await
702}