1use std::collections::HashMap;
7use std::convert::{TryFrom, TryInto};
8use std::fs;
9use std::io::{Error, ErrorKind, Result};
10use std::path::Path;
11use std::str::FromStr;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14
15use serde::Deserialize;
16use serde_json::Value;
17
18#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
20pub struct ConfigV2 {
21 pub version: u32,
23 #[serde(default)]
25 pub id: String,
26 pub backend: Option<BackendConfigV2>,
28 pub cache: Option<CacheConfigV2>,
30 pub rafs: Option<RafsConfigV2>,
32 #[serde(skip)]
34 pub internal: ConfigV2Internal,
35}
36
37impl Default for ConfigV2 {
38 fn default() -> Self {
39 ConfigV2 {
40 version: 2,
41 id: String::new(),
42 backend: None,
43 cache: None,
44 rafs: None,
45 internal: ConfigV2Internal::default(),
46 }
47 }
48}
49
50impl ConfigV2 {
51 pub fn new(id: &str) -> Self {
53 ConfigV2 {
54 version: 2,
55 id: id.to_string(),
56 backend: None,
57 cache: None,
58 rafs: None,
59 internal: ConfigV2Internal::default(),
60 }
61 }
62
63 pub fn new_localfs(id: &str, dir: &str) -> Result<Self> {
65 let content = format!(
66 r#"
67 version = 2
68 id = "{}"
69 backend.type = "localfs"
70 backend.localfs.dir = "{}"
71 cache.type = "filecache"
72 cache.compressed = false
73 cache.validate = false
74 cache.filecache.work_dir = "{}"
75 "#,
76 id, dir, dir
77 );
78
79 Self::from_str(&content)
80 }
81
82 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
84 let md = fs::metadata(path.as_ref())?;
85 if md.len() > 0x100000 {
86 return Err(Error::new(
87 ErrorKind::Other,
88 "configuration file size is too big",
89 ));
90 }
91 let content = fs::read_to_string(path)?;
92 Self::from_str(&content)
93 }
94
95 pub fn validate(&self) -> bool {
97 if self.version != 2 {
98 return false;
99 }
100 if let Some(backend_cfg) = self.backend.as_ref() {
101 if !backend_cfg.validate() {
102 return false;
103 }
104 }
105 if let Some(cache_cfg) = self.cache.as_ref() {
106 if !cache_cfg.validate() {
107 return false;
108 }
109 }
110 if let Some(rafs_cfg) = self.rafs.as_ref() {
111 if !rafs_cfg.validate() {
112 return false;
113 }
114 }
115
116 true
117 }
118
119 pub fn get_backend_config(&self) -> Result<&BackendConfigV2> {
121 self.backend.as_ref().ok_or_else(|| {
122 Error::new(
123 ErrorKind::InvalidInput,
124 "no configuration information for backend",
125 )
126 })
127 }
128
129 pub fn get_cache_config(&self) -> Result<&CacheConfigV2> {
131 self.cache.as_ref().ok_or_else(|| {
132 Error::new(
133 ErrorKind::InvalidData,
134 "no configuration information for cache",
135 )
136 })
137 }
138
139 pub fn get_cache_working_directory(&self) -> Result<String> {
141 let cache = self.get_cache_config()?;
142 if cache.is_filecache() {
143 if let Some(c) = cache.file_cache.as_ref() {
144 return Ok(c.work_dir.clone());
145 }
146 } else if cache.is_fscache() {
147 if let Some(c) = cache.fs_cache.as_ref() {
148 return Ok(c.work_dir.clone());
149 }
150 }
151
152 Err(Error::new(
153 ErrorKind::NotFound,
154 "no working directory configured",
155 ))
156 }
157
158 pub fn get_rafs_config(&self) -> Result<&RafsConfigV2> {
160 self.rafs.as_ref().ok_or_else(|| {
161 Error::new(
162 ErrorKind::InvalidInput,
163 "no configuration information for rafs",
164 )
165 })
166 }
167
168 pub fn clone_without_secrets(&self) -> Self {
170 let mut cfg = self.clone();
171
172 if let Some(backend_cfg) = cfg.backend.as_mut() {
173 if let Some(oss_cfg) = backend_cfg.oss.as_mut() {
174 oss_cfg.access_key_id = String::new();
175 oss_cfg.access_key_secret = String::new();
176 }
177 if let Some(registry_cfg) = backend_cfg.registry.as_mut() {
178 registry_cfg.auth = None;
179 registry_cfg.registry_token = None;
180 }
181 }
182
183 cfg
184 }
185
186 pub fn is_chunk_validation_enabled(&self) -> bool {
188 let mut validation = if let Some(cache) = &self.cache {
189 cache.cache_validate
190 } else {
191 false
192 };
193 if let Some(rafs) = &self.rafs {
194 if rafs.validate {
195 validation = true;
196 }
197 }
198
199 validation
200 }
201
202 pub fn is_fs_cache(&self) -> bool {
204 if let Some(cache) = self.cache.as_ref() {
205 cache.fs_cache.is_some()
206 } else {
207 false
208 }
209 }
210
211 pub fn update_registry_auth_info(&mut self, auth: &Option<String>) {
213 if let Some(auth) = auth {
214 if let Some(backend) = self.backend.as_mut() {
215 if let Some(registry) = backend.registry.as_mut() {
216 registry.auth = Some(auth.to_string());
217 }
218 }
219 }
220 }
221}
222
223impl FromStr for ConfigV2 {
224 type Err = std::io::Error;
225
226 fn from_str(s: &str) -> Result<ConfigV2> {
227 if let Ok(v) = serde_json::from_str::<ConfigV2>(s) {
228 return if v.validate() {
229 Ok(v)
230 } else {
231 Err(Error::new(ErrorKind::InvalidInput, "invalid configuration"))
232 };
233 }
234 if let Ok(v) = toml::from_str::<ConfigV2>(s) {
235 return if v.validate() {
236 Ok(v)
237 } else {
238 Err(Error::new(ErrorKind::InvalidInput, "invalid configuration"))
239 };
240 }
241 if let Ok(v) = serde_json::from_str::<RafsConfig>(s) {
242 if let Ok(v) = ConfigV2::try_from(v) {
243 if v.validate() {
244 return Ok(v);
245 }
246 }
247 }
248 Err(Error::new(
249 ErrorKind::InvalidInput,
250 "failed to parse configuration information",
251 ))
252 }
253}
254
255#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
257pub struct BackendConfigV2 {
258 #[serde(rename = "type")]
260 pub backend_type: String,
261 pub localdisk: Option<LocalDiskConfig>,
263 pub localfs: Option<LocalFsConfig>,
265 pub oss: Option<OssConfig>,
267 pub s3: Option<S3Config>,
269 pub registry: Option<RegistryConfig>,
271 #[serde(rename = "http-proxy")]
273 pub http_proxy: Option<HttpProxyConfig>,
274}
275
276impl BackendConfigV2 {
277 pub fn validate(&self) -> bool {
279 match self.backend_type.as_str() {
280 "localdisk" => match self.localdisk.as_ref() {
281 Some(v) => {
282 if v.device_path.is_empty() {
283 return false;
284 }
285 }
286 None => return false,
287 },
288 "localfs" => match self.localfs.as_ref() {
289 Some(v) => {
290 if v.blob_file.is_empty() && v.dir.is_empty() {
291 return false;
292 }
293 }
294 None => return false,
295 },
296 "oss" => match self.oss.as_ref() {
297 Some(v) => {
298 if v.endpoint.is_empty() || v.bucket_name.is_empty() {
299 return false;
300 }
301 }
302 None => return false,
303 },
304 "s3" => match self.s3.as_ref() {
305 Some(v) => {
306 if v.region.is_empty() || v.bucket_name.is_empty() {
307 return false;
308 }
309 }
310 None => return false,
311 },
312 "registry" => match self.registry.as_ref() {
313 Some(v) => {
314 if v.host.is_empty() || v.repo.is_empty() {
315 return false;
316 }
317 }
318 None => return false,
319 },
320
321 "http-proxy" => match self.http_proxy.as_ref() {
322 Some(v) => {
323 let is_valid_unix_socket_path = |path: &str| {
324 let path = Path::new(path);
325 path.is_absolute() && path.exists()
326 };
327 if v.addr.is_empty()
328 || !(v.addr.starts_with("http://")
329 || v.addr.starts_with("https://")
330 || is_valid_unix_socket_path(&v.addr))
331 {
332 return false;
333 }
334
335 if Path::new(&v.path).join("any_blob_id").to_str().is_none() {
337 return false;
338 }
339 }
340 None => return false,
341 },
342 _ => return false,
343 }
344
345 true
346 }
347
348 pub fn get_localdisk_config(&self) -> Result<&LocalDiskConfig> {
350 if &self.backend_type != "localdisk" {
351 Err(Error::new(
352 ErrorKind::InvalidInput,
353 "backend type is not 'localdisk'",
354 ))
355 } else {
356 self.localdisk.as_ref().ok_or_else(|| {
357 Error::new(
358 ErrorKind::InvalidData,
359 "no configuration information for localdisk",
360 )
361 })
362 }
363 }
364
365 pub fn get_localfs_config(&self) -> Result<&LocalFsConfig> {
367 if &self.backend_type != "localfs" {
368 Err(Error::new(
369 ErrorKind::InvalidInput,
370 "backend type is not 'localfs'",
371 ))
372 } else {
373 self.localfs.as_ref().ok_or_else(|| {
374 Error::new(
375 ErrorKind::InvalidData,
376 "no configuration information for localfs",
377 )
378 })
379 }
380 }
381
382 pub fn get_oss_config(&self) -> Result<&OssConfig> {
384 if &self.backend_type != "oss" {
385 Err(Error::new(
386 ErrorKind::InvalidInput,
387 "backend type is not 'oss'",
388 ))
389 } else {
390 self.oss.as_ref().ok_or_else(|| {
391 Error::new(
392 ErrorKind::InvalidData,
393 "no configuration information for OSS",
394 )
395 })
396 }
397 }
398
399 pub fn get_s3_config(&self) -> Result<&S3Config> {
401 if &self.backend_type != "s3" {
402 Err(Error::new(
403 ErrorKind::InvalidInput,
404 "backend type is not 's3'",
405 ))
406 } else {
407 self.s3.as_ref().ok_or_else(|| {
408 Error::new(
409 ErrorKind::InvalidData,
410 "no configuration information for s3",
411 )
412 })
413 }
414 }
415
416 pub fn get_registry_config(&self) -> Result<&RegistryConfig> {
418 if &self.backend_type != "registry" {
419 Err(Error::new(
420 ErrorKind::InvalidInput,
421 "backend type is not 'registry'",
422 ))
423 } else {
424 self.registry.as_ref().ok_or_else(|| {
425 Error::new(
426 ErrorKind::InvalidData,
427 "no configuration information for registry",
428 )
429 })
430 }
431 }
432
433 pub fn get_http_proxy_config(&self) -> Result<&HttpProxyConfig> {
435 if &self.backend_type != "http-proxy" {
436 Err(Error::new(
437 ErrorKind::InvalidInput,
438 "backend type is not 'http-proxy'",
439 ))
440 } else {
441 self.http_proxy.as_ref().ok_or_else(|| {
442 Error::new(
443 ErrorKind::InvalidData,
444 "no configuration information for http-proxy",
445 )
446 })
447 }
448 }
449}
450
451#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
453pub struct LocalDiskConfig {
454 #[serde(default)]
456 pub device_path: String,
457 #[serde(default)]
459 pub disable_gpt: bool,
460}
461
462#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
464pub struct LocalFsConfig {
465 #[serde(default)]
467 pub blob_file: String,
468 #[serde(default)]
470 pub dir: String,
471 #[serde(default)]
473 pub alt_dirs: Vec<String>,
474}
475
476#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
478pub struct OssConfig {
479 #[serde(default = "default_http_scheme")]
481 pub scheme: String,
482 pub endpoint: String,
484 pub bucket_name: String,
486 #[serde(default)]
491 pub object_prefix: String,
492 #[serde(default)]
494 pub access_key_id: String,
495 #[serde(default)]
497 pub access_key_secret: String,
498 #[serde(default)]
500 pub skip_verify: bool,
501 #[serde(default = "default_http_timeout")]
503 pub timeout: u32,
504 #[serde(default = "default_http_timeout")]
506 pub connect_timeout: u32,
507 #[serde(default)]
509 pub retry_limit: u8,
510 #[serde(default)]
512 pub proxy: ProxyConfig,
513 #[serde(default)]
515 pub mirrors: Vec<MirrorConfig>,
516}
517
518#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
520pub struct S3Config {
521 #[serde(default = "default_http_scheme")]
523 pub scheme: String,
524 pub endpoint: String,
526 pub region: String,
528 pub bucket_name: String,
530 #[serde(default)]
535 pub object_prefix: String,
536 #[serde(default)]
538 pub access_key_id: String,
539 #[serde(default)]
541 pub access_key_secret: String,
542 #[serde(default)]
544 pub skip_verify: bool,
545 #[serde(default = "default_http_timeout")]
547 pub timeout: u32,
548 #[serde(default = "default_http_timeout")]
550 pub connect_timeout: u32,
551 #[serde(default)]
553 pub retry_limit: u8,
554 #[serde(default)]
556 pub proxy: ProxyConfig,
557 #[serde(default)]
559 pub mirrors: Vec<MirrorConfig>,
560}
561
562#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
564pub struct HttpProxyConfig {
565 pub addr: String,
567 #[serde(default)]
570 pub path: String,
571 #[serde(default)]
573 pub skip_verify: bool,
574 #[serde(default = "default_http_timeout")]
576 pub timeout: u32,
577 #[serde(default = "default_http_timeout")]
579 pub connect_timeout: u32,
580 #[serde(default)]
582 pub retry_limit: u8,
583 #[serde(default)]
585 pub proxy: ProxyConfig,
586 #[serde(default)]
588 pub mirrors: Vec<MirrorConfig>,
589}
590
591#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
593pub struct RegistryConfig {
594 #[serde(default = "default_http_scheme")]
596 pub scheme: String,
597 pub host: String,
599 pub repo: String,
601 #[serde(default)]
603 pub auth: Option<String>,
604 #[serde(default)]
606 pub skip_verify: bool,
607 #[serde(default = "default_http_timeout")]
609 pub timeout: u32,
610 #[serde(default = "default_http_timeout")]
612 pub connect_timeout: u32,
613 #[serde(default)]
615 pub retry_limit: u8,
616 #[serde(default)]
618 pub registry_token: Option<String>,
619 #[serde(default)]
622 pub blob_url_scheme: String,
623 #[serde(default)]
625 pub blob_redirected_host: String,
626 #[serde(default)]
628 pub proxy: ProxyConfig,
629 #[serde(default)]
631 pub mirrors: Vec<MirrorConfig>,
632}
633
634#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
636pub struct CacheConfigV2 {
637 #[serde(default, rename = "type")]
639 pub cache_type: String,
640 #[serde(default, rename = "compressed")]
642 pub cache_compressed: bool,
643 #[serde(default, rename = "validate")]
645 pub cache_validate: bool,
646 #[serde(default)]
648 pub prefetch: PrefetchConfigV2,
649 #[serde(rename = "filecache")]
651 pub file_cache: Option<FileCacheConfig>,
652 #[serde(rename = "fscache")]
653 pub fs_cache: Option<FsCacheConfig>,
655}
656
657impl CacheConfigV2 {
658 pub fn validate(&self) -> bool {
660 match self.cache_type.as_str() {
661 "blobcache" | "filecache" => {
662 if let Some(c) = self.file_cache.as_ref() {
663 if c.work_dir.is_empty() {
664 return false;
665 }
666 } else {
667 return false;
668 }
669 }
670 "fscache" => {
671 if let Some(c) = self.fs_cache.as_ref() {
672 if c.work_dir.is_empty() {
673 return false;
674 }
675 } else {
676 return false;
677 }
678 }
679 "" | "dummycache" => {}
680 _ => return false,
681 }
682
683 if self.prefetch.enable {
684 if self.prefetch.batch_size > 0x10000000 {
685 return false;
686 }
687 if self.prefetch.threads == 0 || self.prefetch.threads > 1024 {
688 return false;
689 }
690 }
691
692 true
693 }
694
695 pub fn is_filecache(&self) -> bool {
697 self.cache_type == "blobcache" || self.cache_type == "filecache"
698 }
699
700 pub fn is_fscache(&self) -> bool {
702 self.cache_type == "fscache"
703 }
704
705 pub fn get_filecache_config(&self) -> Result<&FileCacheConfig> {
707 if self.is_filecache() {
708 self.file_cache.as_ref().ok_or_else(|| {
709 Error::new(
710 ErrorKind::InvalidInput,
711 "no configuration information for filecache",
712 )
713 })
714 } else {
715 Err(Error::new(
716 ErrorKind::InvalidData,
717 "cache type is not 'filecache'",
718 ))
719 }
720 }
721
722 pub fn get_fscache_config(&self) -> Result<&FsCacheConfig> {
724 if self.is_fscache() {
725 self.fs_cache.as_ref().ok_or_else(|| {
726 Error::new(
727 ErrorKind::InvalidData,
728 "no configuration information for fscache",
729 )
730 })
731 } else {
732 Err(Error::new(
733 ErrorKind::InvalidInput,
734 "cache type is not 'fscache'",
735 ))
736 }
737 }
738}
739
740#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
742pub struct FileCacheConfig {
743 #[serde(default = "default_work_dir")]
745 pub work_dir: String,
746 #[serde(default)]
748 pub disable_indexed_map: bool,
749 #[serde(default)]
751 pub enable_encryption: bool,
752 #[serde(default)]
754 pub enable_convergent_encryption: bool,
755 #[serde(default)]
757 pub encryption_key: String,
758}
759
760impl FileCacheConfig {
761 pub fn get_work_dir(&self) -> Result<&str> {
763 let path = fs::metadata(&self.work_dir)
764 .or_else(|_| {
765 fs::create_dir_all(&self.work_dir)?;
766 fs::metadata(&self.work_dir)
767 })
768 .map_err(|e| {
769 log::error!("fail to stat filecache work_dir {}: {}", self.work_dir, e);
770 e
771 })?;
772
773 if path.is_dir() {
774 Ok(&self.work_dir)
775 } else {
776 Err(Error::new(
777 ErrorKind::NotFound,
778 format!("filecache work_dir {} is not a directory", self.work_dir),
779 ))
780 }
781 }
782}
783
784#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
786pub struct FsCacheConfig {
787 #[serde(default = "default_work_dir")]
789 pub work_dir: String,
790}
791
792impl FsCacheConfig {
793 pub fn get_work_dir(&self) -> Result<&str> {
795 let path = fs::metadata(&self.work_dir)
796 .or_else(|_| {
797 fs::create_dir_all(&self.work_dir)?;
798 fs::metadata(&self.work_dir)
799 })
800 .map_err(|e| {
801 log::error!("fail to stat fscache work_dir {}: {}", self.work_dir, e);
802 e
803 })?;
804
805 if path.is_dir() {
806 Ok(&self.work_dir)
807 } else {
808 Err(Error::new(
809 ErrorKind::NotFound,
810 format!("fscache work_dir {} is not a directory", self.work_dir),
811 ))
812 }
813 }
814}
815
816#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
818pub struct RafsConfigV2 {
819 #[serde(default = "default_rafs_mode")]
821 pub mode: String,
822 #[serde(default = "default_batch_size")]
824 pub batch_size: usize,
825 #[serde(default)]
827 pub validate: bool,
828 #[serde(default)]
830 pub enable_xattr: bool,
831 #[serde(default)]
835 pub iostats_files: bool,
836 #[serde(default)]
838 pub access_pattern: bool,
839 #[serde(default)]
841 pub latest_read_files: bool,
842 #[serde(default)]
844 pub prefetch: PrefetchConfigV2,
845}
846
847impl RafsConfigV2 {
848 pub fn validate(&self) -> bool {
850 if self.mode != "direct" && self.mode != "cached" {
851 return false;
852 }
853 if self.batch_size > 0x10000000 {
854 return false;
855 }
856 if self.prefetch.enable {
857 if self.prefetch.batch_size > 0x10000000 {
858 return false;
859 }
860 if self.prefetch.threads == 0 || self.prefetch.threads > 1024 {
861 return false;
862 }
863 }
864
865 true
866 }
867}
868
869#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize)]
871pub struct PrefetchConfigV2 {
872 pub enable: bool,
874 #[serde(default = "default_prefetch_threads")]
876 pub threads: usize,
877 #[serde(default = "default_prefetch_batch_size")]
879 pub batch_size: usize,
880 #[serde(default)]
882 pub bandwidth_limit: u32,
883 #[serde(default)]
885 pub prefetch_all: bool,
886}
887
888#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
890pub struct ProxyConfig {
891 #[serde(default)]
893 pub url: String,
894 #[serde(default)]
896 pub ping_url: String,
897 #[serde(default = "default_true")]
899 pub fallback: bool,
900 #[serde(default = "default_check_interval")]
902 pub check_interval: u64,
903 #[serde(default)]
905 pub use_http: bool,
906}
907
908impl Default for ProxyConfig {
909 fn default() -> Self {
910 Self {
911 url: String::new(),
912 ping_url: String::new(),
913 fallback: true,
914 check_interval: 5,
915 use_http: false,
916 }
917 }
918}
919
920#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
922pub struct MirrorConfig {
923 pub host: String,
925 #[serde(default)]
927 pub ping_url: String,
928 #[serde(default)]
930 pub headers: HashMap<String, String>,
931 #[serde(default = "default_check_interval")]
933 pub health_check_interval: u64,
934 #[serde(default = "default_failure_limit")]
936 pub failure_limit: u8,
937}
938
939impl Default for MirrorConfig {
940 fn default() -> Self {
941 Self {
942 host: String::new(),
943 headers: HashMap::new(),
944 health_check_interval: 5,
945 failure_limit: 5,
946 ping_url: String::new(),
947 }
948 }
949}
950
951#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
953pub struct BlobCacheEntryConfigV2 {
954 pub version: u32,
956 #[serde(default)]
958 pub id: String,
959 #[serde(default)]
961 pub backend: BackendConfigV2,
962 #[serde(default)]
964 pub cache: CacheConfigV2,
965 #[serde(default)]
967 pub metadata_path: Option<String>,
968}
969
970impl BlobCacheEntryConfigV2 {
971 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
973 let md = fs::metadata(path.as_ref())?;
974 if md.len() > 0x100000 {
975 return Err(Error::new(
976 ErrorKind::InvalidInput,
977 "configuration file size is too big",
978 ));
979 }
980 let content = fs::read_to_string(path)?;
981 Self::from_str(&content)
982 }
983
984 pub fn validate(&self) -> bool {
986 if self.version != 2 {
987 return false;
988 }
989 let config: ConfigV2 = self.into();
990 config.validate()
991 }
992}
993
994impl FromStr for BlobCacheEntryConfigV2 {
995 type Err = Error;
996
997 fn from_str(s: &str) -> Result<BlobCacheEntryConfigV2> {
998 if let Ok(v) = serde_json::from_str::<BlobCacheEntryConfigV2>(s) {
999 return if v.validate() {
1000 Ok(v)
1001 } else {
1002 Err(Error::new(ErrorKind::InvalidInput, "invalid configuration"))
1003 };
1004 }
1005 if let Ok(v) = toml::from_str::<BlobCacheEntryConfigV2>(s) {
1006 return if v.validate() {
1007 Ok(v)
1008 } else {
1009 Err(Error::new(ErrorKind::InvalidInput, "invalid configuration"))
1010 };
1011 }
1012 Err(Error::new(
1013 ErrorKind::InvalidInput,
1014 "failed to parse configuration information",
1015 ))
1016 }
1017}
1018
1019impl From<&BlobCacheEntryConfigV2> for ConfigV2 {
1020 fn from(c: &BlobCacheEntryConfigV2) -> Self {
1021 ConfigV2 {
1022 version: c.version,
1023 id: c.id.clone(),
1024 backend: Some(c.backend.clone()),
1025 cache: Some(c.cache.clone()),
1026 rafs: None,
1027 internal: ConfigV2Internal::default(),
1028 }
1029 }
1030}
1031
1032#[derive(Clone, Debug)]
1034pub struct ConfigV2Internal {
1035 pub blob_accessible: Arc<AtomicBool>,
1037}
1038
1039impl Default for ConfigV2Internal {
1040 fn default() -> Self {
1041 ConfigV2Internal {
1042 blob_accessible: Arc::new(AtomicBool::new(false)),
1043 }
1044 }
1045}
1046
1047impl PartialEq for ConfigV2Internal {
1048 fn eq(&self, other: &Self) -> bool {
1049 self.blob_accessible() == other.blob_accessible()
1050 }
1051}
1052
1053impl Eq for ConfigV2Internal {}
1054
1055impl ConfigV2Internal {
1056 pub fn blob_accessible(&self) -> bool {
1058 self.blob_accessible.load(Ordering::Relaxed)
1059 }
1060
1061 pub fn set_blob_accessible(&self, accessible: bool) {
1063 self.blob_accessible.store(accessible, Ordering::Relaxed);
1064 }
1065}
1066
1067pub const BLOB_CACHE_TYPE_META_BLOB: &str = "bootstrap";
1069pub const BLOB_CACHE_TYPE_DATA_BLOB: &str = "datablob";
1071
1072#[derive(Debug, Deserialize, Serialize)]
1074pub struct BlobCacheEntry {
1075 #[serde(rename = "type")]
1077 pub blob_type: String,
1078 #[serde(rename = "id")]
1080 pub blob_id: String,
1081 #[serde(default, rename = "config")]
1083 pub(crate) blob_config_legacy: Option<BlobCacheEntryConfig>,
1084 #[serde(default, rename = "config_v2")]
1086 pub blob_config: Option<BlobCacheEntryConfigV2>,
1087 #[serde(default)]
1089 pub domain_id: String,
1090}
1091
1092impl BlobCacheEntry {
1093 pub fn prepare_configuration_info(&mut self) -> bool {
1094 if self.blob_config.is_none() {
1095 if let Some(legacy) = self.blob_config_legacy.as_ref() {
1096 match legacy.try_into() {
1097 Err(_) => return false,
1098 Ok(v) => self.blob_config = Some(v),
1099 }
1100 }
1101 }
1102
1103 match self.blob_config.as_ref() {
1104 None => false,
1105 Some(cfg) => cfg.cache.validate() && cfg.backend.validate(),
1106 }
1107 }
1108}
1109
1110impl BlobCacheEntry {
1111 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
1113 let md = fs::metadata(path.as_ref())?;
1114 if md.len() > 0x100000 {
1115 return Err(Error::new(
1116 ErrorKind::InvalidInput,
1117 "configuration file size is too big",
1118 ));
1119 }
1120 let content = fs::read_to_string(path)?;
1121 Self::from_str(&content)
1122 }
1123
1124 pub fn validate(&self) -> bool {
1126 if self.blob_type != BLOB_CACHE_TYPE_META_BLOB
1127 && self.blob_type != BLOB_CACHE_TYPE_DATA_BLOB
1128 {
1129 log::warn!("invalid blob type {} for blob cache entry", self.blob_type);
1130 return false;
1131 }
1132 if let Some(config) = self.blob_config.as_ref() {
1133 if !config.validate() {
1134 return false;
1135 }
1136 }
1137 true
1138 }
1139}
1140
1141impl FromStr for BlobCacheEntry {
1142 type Err = Error;
1143
1144 fn from_str(s: &str) -> Result<BlobCacheEntry> {
1145 if let Ok(v) = serde_json::from_str::<BlobCacheEntry>(s) {
1146 return if v.validate() {
1147 Ok(v)
1148 } else {
1149 Err(Error::new(ErrorKind::InvalidInput, "invalid configuration"))
1150 };
1151 }
1152 if let Ok(v) = toml::from_str::<BlobCacheEntry>(s) {
1153 return if v.validate() {
1154 Ok(v)
1155 } else {
1156 Err(Error::new(ErrorKind::InvalidInput, "invalid configuration"))
1157 };
1158 }
1159 Err(Error::new(
1160 ErrorKind::InvalidInput,
1161 "failed to parse configuration information",
1162 ))
1163 }
1164}
1165
1166#[derive(Debug, Default, Deserialize, Serialize)]
1168pub struct BlobCacheList {
1169 pub blobs: Vec<BlobCacheEntry>,
1171}
1172
1173fn default_true() -> bool {
1174 true
1175}
1176
1177fn default_http_scheme() -> String {
1178 "https".to_string()
1179}
1180
1181fn default_http_timeout() -> u32 {
1182 5
1183}
1184
1185fn default_check_interval() -> u64 {
1186 5
1187}
1188
1189fn default_failure_limit() -> u8 {
1190 5
1191}
1192
1193fn default_work_dir() -> String {
1194 ".".to_string()
1195}
1196
1197pub fn default_batch_size() -> usize {
1198 128 * 1024
1199}
1200
1201fn default_prefetch_batch_size() -> usize {
1202 1024 * 1024
1203}
1204
1205fn default_prefetch_threads() -> usize {
1206 8
1207}
1208
1209fn default_prefetch_all() -> bool {
1210 true
1211}
1212
1213fn default_rafs_mode() -> String {
1214 "direct".to_string()
1215}
1216
1217#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
1223struct BackendConfig {
1224 #[serde(rename = "type")]
1226 pub backend_type: String,
1227 #[serde(rename = "config")]
1230 pub backend_config: Value,
1231}
1232
1233impl TryFrom<&BackendConfig> for BackendConfigV2 {
1234 type Error = std::io::Error;
1235
1236 fn try_from(value: &BackendConfig) -> std::result::Result<Self, Self::Error> {
1237 let mut config = BackendConfigV2 {
1238 backend_type: value.backend_type.clone(),
1239 localdisk: None,
1240 localfs: None,
1241 oss: None,
1242 s3: None,
1243 registry: None,
1244 http_proxy: None,
1245 };
1246
1247 match value.backend_type.as_str() {
1248 "localdisk" => {
1249 config.localdisk = Some(serde_json::from_value(value.backend_config.clone())?);
1250 }
1251 "localfs" => {
1252 config.localfs = Some(serde_json::from_value(value.backend_config.clone())?);
1253 }
1254 "oss" => {
1255 config.oss = Some(serde_json::from_value(value.backend_config.clone())?);
1256 }
1257 "s3" => {
1258 config.s3 = Some(serde_json::from_value(value.backend_config.clone())?);
1259 }
1260 "registry" => {
1261 config.registry = Some(serde_json::from_value(value.backend_config.clone())?);
1262 }
1263 v => {
1264 return Err(Error::new(
1265 ErrorKind::InvalidInput,
1266 format!("unsupported backend type '{}'", v),
1267 ))
1268 }
1269 }
1270
1271 Ok(config)
1272 }
1273}
1274
1275#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
1277struct CacheConfig {
1278 #[serde(default, rename = "type")]
1280 pub cache_type: String,
1281 #[serde(default, rename = "compressed")]
1283 pub cache_compressed: bool,
1284 #[serde(default, rename = "config")]
1286 pub cache_config: Value,
1287 #[serde(skip_serializing, skip_deserializing)]
1289 pub cache_validate: bool,
1290 #[serde(skip_serializing, skip_deserializing)]
1292 pub prefetch_config: BlobPrefetchConfig,
1293}
1294
1295impl TryFrom<&CacheConfig> for CacheConfigV2 {
1296 type Error = std::io::Error;
1297
1298 fn try_from(v: &CacheConfig) -> std::result::Result<Self, Self::Error> {
1299 let mut config = CacheConfigV2 {
1300 cache_type: v.cache_type.clone(),
1301 cache_compressed: v.cache_compressed,
1302 cache_validate: v.cache_validate,
1303 prefetch: (&v.prefetch_config).into(),
1304 file_cache: None,
1305 fs_cache: None,
1306 };
1307
1308 match v.cache_type.as_str() {
1309 "blobcache" | "filecache" => {
1310 config.file_cache = Some(serde_json::from_value(v.cache_config.clone())?);
1311 }
1312 "fscache" => {
1313 config.fs_cache = Some(serde_json::from_value(v.cache_config.clone())?);
1314 }
1315 "" => {}
1316 t => {
1317 return Err(Error::new(
1318 ErrorKind::InvalidInput,
1319 format!("unsupported cache type '{}'", t),
1320 ))
1321 }
1322 }
1323
1324 Ok(config)
1325 }
1326}
1327
1328#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
1330struct FactoryConfig {
1331 #[serde(default)]
1333 pub id: String,
1334 pub backend: BackendConfig,
1336 #[serde(default)]
1338 pub cache: CacheConfig,
1339}
1340
1341#[derive(Clone, Default, Deserialize)]
1343struct RafsConfig {
1344 pub device: FactoryConfig,
1346 pub mode: String,
1348 #[serde(default)]
1350 pub digest_validate: bool,
1351 #[serde(default)]
1353 pub iostats_files: bool,
1354 #[serde(default)]
1356 pub fs_prefetch: FsPrefetchControl,
1357 #[serde(default)]
1359 pub enable_xattr: bool,
1360 #[serde(default)]
1362 pub access_pattern: bool,
1363 #[serde(default)]
1365 pub latest_read_files: bool,
1366 #[serde(default = "default_batch_size")]
1368 pub amplify_io: usize,
1369}
1370
1371impl TryFrom<RafsConfig> for ConfigV2 {
1372 type Error = std::io::Error;
1373
1374 fn try_from(v: RafsConfig) -> std::result::Result<Self, Self::Error> {
1375 let backend: BackendConfigV2 = (&v.device.backend).try_into()?;
1376 let mut cache: CacheConfigV2 = (&v.device.cache).try_into()?;
1377 let rafs = RafsConfigV2 {
1378 mode: v.mode,
1379 batch_size: v.amplify_io,
1380 validate: v.digest_validate,
1381 enable_xattr: v.enable_xattr,
1382 iostats_files: v.iostats_files,
1383 access_pattern: v.access_pattern,
1384 latest_read_files: v.latest_read_files,
1385 prefetch: v.fs_prefetch.into(),
1386 };
1387 if !cache.prefetch.enable && rafs.prefetch.enable {
1388 cache.prefetch = rafs.prefetch.clone();
1389 }
1390
1391 Ok(ConfigV2 {
1392 version: 2,
1393 id: v.device.id,
1394 backend: Some(backend),
1395 cache: Some(cache),
1396 rafs: Some(rafs),
1397 internal: ConfigV2Internal::default(),
1398 })
1399 }
1400}
1401
1402#[derive(Clone, Default, Deserialize)]
1404struct FsPrefetchControl {
1405 #[serde(default)]
1407 pub enable: bool,
1408
1409 #[serde(default = "default_prefetch_threads")]
1411 pub threads_count: usize,
1412
1413 #[serde(default = "default_batch_size")]
1415 pub merging_size: usize,
1416
1417 #[serde(default)]
1426 pub bandwidth_rate: u32,
1427
1428 #[serde(default = "default_prefetch_all")]
1430 pub prefetch_all: bool,
1431}
1432
1433impl From<FsPrefetchControl> for PrefetchConfigV2 {
1434 fn from(v: FsPrefetchControl) -> Self {
1435 PrefetchConfigV2 {
1436 enable: v.enable,
1437 threads: v.threads_count,
1438 batch_size: v.merging_size,
1439 bandwidth_limit: v.bandwidth_rate,
1440 prefetch_all: v.prefetch_all,
1441 }
1442 }
1443}
1444
1445#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize)]
1447struct BlobPrefetchConfig {
1448 pub enable: bool,
1450 pub threads_count: usize,
1452 pub merging_size: usize,
1454 pub bandwidth_rate: u32,
1456}
1457
1458impl From<&BlobPrefetchConfig> for PrefetchConfigV2 {
1459 fn from(v: &BlobPrefetchConfig) -> Self {
1460 PrefetchConfigV2 {
1461 enable: v.enable,
1462 threads: v.threads_count,
1463 batch_size: v.merging_size,
1464 bandwidth_limit: v.bandwidth_rate,
1465 prefetch_all: true,
1466 }
1467 }
1468}
1469
1470#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
1472pub(crate) struct BlobCacheEntryConfig {
1473 #[serde(default)]
1475 id: String,
1476 backend_type: String,
1478 backend_config: Value,
1482 cache_type: String,
1486 cache_config: Value,
1490 #[serde(default)]
1492 prefetch_config: BlobPrefetchConfig,
1493 #[serde(default)]
1495 metadata_path: Option<String>,
1496}
1497
1498impl TryFrom<&BlobCacheEntryConfig> for BlobCacheEntryConfigV2 {
1499 type Error = std::io::Error;
1500
1501 fn try_from(v: &BlobCacheEntryConfig) -> std::result::Result<Self, Self::Error> {
1502 let backend_config = BackendConfig {
1503 backend_type: v.backend_type.clone(),
1504 backend_config: v.backend_config.clone(),
1505 };
1506 let cache_config = CacheConfig {
1507 cache_type: v.cache_type.clone(),
1508 cache_compressed: false,
1509 cache_config: v.cache_config.clone(),
1510 cache_validate: false,
1511 prefetch_config: v.prefetch_config.clone(),
1512 };
1513 Ok(BlobCacheEntryConfigV2 {
1514 version: 2,
1515 id: v.id.clone(),
1516 backend: (&backend_config).try_into()?,
1517 cache: (&cache_config).try_into()?,
1518 metadata_path: v.metadata_path.clone(),
1519 })
1520 }
1521}
1522
1523#[cfg(test)]
1524mod tests {
1525 use super::*;
1526 use crate::{BlobCacheEntry, BLOB_CACHE_TYPE_META_BLOB};
1527
1528 #[test]
1529 fn test_blob_prefetch_config() {
1530 let config = BlobPrefetchConfig::default();
1531 assert!(!config.enable);
1532 assert_eq!(config.threads_count, 0);
1533 assert_eq!(config.merging_size, 0);
1534 assert_eq!(config.bandwidth_rate, 0);
1535
1536 let content = r#"{
1537 "enable": true,
1538 "threads_count": 2,
1539 "merging_size": 4,
1540 "bandwidth_rate": 5
1541 }"#;
1542 let config: BlobPrefetchConfig = serde_json::from_str(content).unwrap();
1543 assert!(config.enable);
1544 assert_eq!(config.threads_count, 2);
1545 assert_eq!(config.merging_size, 4);
1546 assert_eq!(config.bandwidth_rate, 5);
1547
1548 let config: PrefetchConfigV2 = (&config).into();
1549 assert!(config.enable);
1550 assert_eq!(config.threads, 2);
1551 assert_eq!(config.batch_size, 4);
1552 assert_eq!(config.bandwidth_limit, 5);
1553 assert!(config.prefetch_all);
1554 }
1555
1556 #[test]
1557 fn test_file_cache_config() {
1558 let config: FileCacheConfig = serde_json::from_str("{}").unwrap();
1559 assert_eq!(&config.work_dir, ".");
1560 assert!(!config.disable_indexed_map);
1561
1562 let config: FileCacheConfig =
1563 serde_json::from_str("{\"work_dir\":\"/tmp\",\"disable_indexed_map\":true}").unwrap();
1564 assert_eq!(&config.work_dir, "/tmp");
1565 assert!(config.get_work_dir().is_ok());
1566 assert!(config.disable_indexed_map);
1567
1568 let config: FileCacheConfig =
1569 serde_json::from_str("{\"work_dir\":\"/proc/mounts\",\"disable_indexed_map\":true}")
1570 .unwrap();
1571 assert!(config.get_work_dir().is_err());
1572 }
1573
1574 #[test]
1575 fn test_fs_cache_config() {
1576 let config: FsCacheConfig = serde_json::from_str("{}").unwrap();
1577 assert_eq!(&config.work_dir, ".");
1578
1579 let config: FileCacheConfig = serde_json::from_str("{\"work_dir\":\"/tmp\"}").unwrap();
1580 assert_eq!(&config.work_dir, "/tmp");
1581 assert!(config.get_work_dir().is_ok());
1582
1583 let config: FileCacheConfig =
1584 serde_json::from_str("{\"work_dir\":\"/proc/mounts\"}").unwrap();
1585 assert!(config.get_work_dir().is_err());
1586 }
1587
1588 #[test]
1589 fn test_blob_cache_entry() {
1590 let content = r#"{
1591 "type": "bootstrap",
1592 "id": "blob1",
1593 "config": {
1594 "id": "cache1",
1595 "backend_type": "localfs",
1596 "backend_config": {},
1597 "cache_type": "fscache",
1598 "cache_config": {},
1599 "prefetch_config": {
1600 "enable": true,
1601 "threads_count": 2,
1602 "merging_size": 4,
1603 "bandwidth_rate": 5
1604 },
1605 "metadata_path": "/tmp/metadata1"
1606 },
1607 "domain_id": "domain1"
1608 }"#;
1609 let config: BlobCacheEntry = serde_json::from_str(content).unwrap();
1610 assert_eq!(&config.blob_type, BLOB_CACHE_TYPE_META_BLOB);
1611 assert_eq!(&config.blob_id, "blob1");
1612 assert_eq!(&config.domain_id, "domain1");
1613
1614 let blob_config = config.blob_config_legacy.as_ref().unwrap();
1615 assert_eq!(blob_config.id, "cache1");
1616 assert_eq!(blob_config.backend_type, "localfs");
1617 assert_eq!(blob_config.cache_type, "fscache");
1618 assert!(blob_config.cache_config.is_object());
1619 assert!(blob_config.prefetch_config.enable);
1620 assert_eq!(blob_config.prefetch_config.threads_count, 2);
1621 assert_eq!(blob_config.prefetch_config.merging_size, 4);
1622 assert_eq!(
1623 blob_config.metadata_path.as_ref().unwrap().as_str(),
1624 "/tmp/metadata1"
1625 );
1626
1627 let blob_config: BlobCacheEntryConfigV2 = blob_config.try_into().unwrap();
1628 assert_eq!(blob_config.id, "cache1");
1629 assert_eq!(blob_config.backend.backend_type, "localfs");
1630 assert_eq!(blob_config.cache.cache_type, "fscache");
1631 assert!(blob_config.cache.fs_cache.is_some());
1632 assert!(blob_config.cache.prefetch.enable);
1633 assert_eq!(blob_config.cache.prefetch.threads, 2);
1634 assert_eq!(blob_config.cache.prefetch.batch_size, 4);
1635 assert_eq!(
1636 blob_config.metadata_path.as_ref().unwrap().as_str(),
1637 "/tmp/metadata1"
1638 );
1639
1640 let content = r#"{
1641 "type": "bootstrap",
1642 "id": "blob1",
1643 "config": {
1644 "id": "cache1",
1645 "backend_type": "localfs",
1646 "backend_config": {},
1647 "cache_type": "fscache",
1648 "cache_config": {},
1649 "metadata_path": "/tmp/metadata1"
1650 },
1651 "domain_id": "domain1"
1652 }"#;
1653 let config: BlobCacheEntry = serde_json::from_str(content).unwrap();
1654 let blob_config = config.blob_config_legacy.as_ref().unwrap();
1655 assert!(!blob_config.prefetch_config.enable);
1656 assert_eq!(blob_config.prefetch_config.threads_count, 0);
1657 assert_eq!(blob_config.prefetch_config.merging_size, 0);
1658 }
1659
1660 #[test]
1661 fn test_proxy_config() {
1662 let content = r#"{
1663 "url": "foo.com",
1664 "ping_url": "ping.foo.com",
1665 "fallback": true
1666 }"#;
1667 let config: ProxyConfig = serde_json::from_str(content).unwrap();
1668 assert_eq!(config.url, "foo.com");
1669 assert_eq!(config.ping_url, "ping.foo.com");
1670 assert!(config.fallback);
1671 assert_eq!(config.check_interval, 5);
1672 }
1673
1674 #[test]
1675 fn test_oss_config() {
1676 let content = r#"{
1677 "endpoint": "test",
1678 "access_key_id": "test",
1679 "access_key_secret": "test",
1680 "bucket_name": "antsys-nydus",
1681 "object_prefix":"nydus_v2/"
1682 }"#;
1683 let config: OssConfig = serde_json::from_str(content).unwrap();
1684 assert_eq!(config.scheme, "https");
1685 assert!(!config.skip_verify);
1686 assert_eq!(config.timeout, 5);
1687 assert_eq!(config.connect_timeout, 5);
1688 }
1689
1690 #[test]
1691 fn test_s3_config() {
1692 let content = r#"{
1693 "endpoint": "test",
1694 "region": "us-east-1",
1695 "access_key_id": "test",
1696 "access_key_secret": "test",
1697 "bucket_name": "antsys-nydus",
1698 "object_prefix":"nydus_v2/"
1699 }"#;
1700 let config: OssConfig = serde_json::from_str(content).unwrap();
1701 assert_eq!(config.scheme, "https");
1702 assert!(!config.skip_verify);
1703 assert_eq!(config.timeout, 5);
1704 assert_eq!(config.connect_timeout, 5);
1705 }
1706
1707 #[test]
1708 fn test_registry_config() {
1709 let content = r#"{
1710 "scheme": "http",
1711 "skip_verify": true,
1712 "host": "my-registry:5000",
1713 "repo": "test/repo",
1714 "auth": "base64_encoded_auth",
1715 "registry_token": "bearer_token",
1716 "blob_redirected_host": "blob_redirected_host"
1717 }"#;
1718 let config: RegistryConfig = serde_json::from_str(content).unwrap();
1719 assert_eq!(config.scheme, "http");
1720 assert!(config.skip_verify);
1721 }
1722
1723 #[test]
1724 fn test_localfs_config() {
1725 let content = r#"{
1726 "blob_file": "blob_file",
1727 "dir": "blob_dir",
1728 "alt_dirs": ["dir1", "dir2"]
1729 }"#;
1730 let config: LocalFsConfig = serde_json::from_str(content).unwrap();
1731 assert_eq!(config.blob_file, "blob_file");
1732 assert_eq!(config.dir, "blob_dir");
1733 assert_eq!(config.alt_dirs, vec!["dir1", "dir2"]);
1734 }
1735
1736 #[test]
1737 fn test_localdisk_config() {
1738 let content = r#"{
1739 "device_path": "device_path"
1740 }"#;
1741 let config: LocalDiskConfig = serde_json::from_str(content).unwrap();
1742 assert_eq!(config.device_path, "device_path");
1743 }
1744
1745 #[test]
1746 fn test_backend_config() {
1747 let config = BackendConfig {
1748 backend_type: "localfs".to_string(),
1749 backend_config: Default::default(),
1750 };
1751 let str_val = serde_json::to_string(&config).unwrap();
1752 let config2 = serde_json::from_str(&str_val).unwrap();
1753
1754 assert_eq!(config, config2);
1755 }
1756
1757 #[test]
1758 fn test_v2_version() {
1759 let content = "version=2";
1760 let config: ConfigV2 = toml::from_str(content).unwrap();
1761 assert_eq!(config.version, 2);
1762 assert!(config.backend.is_none());
1763 }
1764
1765 #[test]
1766 fn test_v2_backend() {
1767 let content = r#"version=2
1768 [backend]
1769 type = "localfs"
1770 "#;
1771 let config: ConfigV2 = toml::from_str(content).unwrap();
1772 assert_eq!(config.version, 2);
1773 assert!(config.backend.is_some());
1774 assert!(config.cache.is_none());
1775
1776 let backend = config.backend.as_ref().unwrap();
1777 assert_eq!(&backend.backend_type, "localfs");
1778 assert!(backend.localfs.is_none());
1779 assert!(backend.oss.is_none());
1780 assert!(backend.registry.is_none());
1781 }
1782
1783 #[test]
1784 fn test_v2_backend_localfs() {
1785 let content = r#"version=2
1786 [backend]
1787 type = "localfs"
1788 [backend.localfs]
1789 blob_file = "/tmp/nydus.blob.data"
1790 dir = "/tmp"
1791 alt_dirs = ["/var/nydus/cache"]
1792 "#;
1793 let config: ConfigV2 = toml::from_str(content).unwrap();
1794 assert_eq!(config.version, 2);
1795 assert!(config.backend.is_some());
1796
1797 let backend = config.backend.as_ref().unwrap();
1798 assert_eq!(&backend.backend_type, "localfs");
1799 assert!(backend.localfs.is_some());
1800
1801 let localfs = backend.localfs.as_ref().unwrap();
1802 assert_eq!(&localfs.blob_file, "/tmp/nydus.blob.data");
1803 assert_eq!(&localfs.dir, "/tmp");
1804 assert_eq!(&localfs.alt_dirs[0], "/var/nydus/cache");
1805 }
1806
1807 #[test]
1808 fn test_v2_backend_oss() {
1809 let content = r#"version=2
1810 [backend]
1811 type = "oss"
1812 [backend.oss]
1813 endpoint = "my_endpoint"
1814 bucket_name = "my_bucket_name"
1815 object_prefix = "my_object_prefix"
1816 access_key_id = "my_access_key_id"
1817 access_key_secret = "my_access_key_secret"
1818 scheme = "http"
1819 skip_verify = true
1820 timeout = 10
1821 connect_timeout = 10
1822 retry_limit = 5
1823 [backend.oss.proxy]
1824 url = "localhost:6789"
1825 ping_url = "localhost:6789/ping"
1826 fallback = true
1827 check_interval = 10
1828 use_http = true
1829 [[backend.oss.mirrors]]
1830 host = "http://127.0.0.1:65001"
1831 ping_url = "http://127.0.0.1:65001/ping"
1832 health_check_interval = 10
1833 failure_limit = 10
1834 "#;
1835 let config: ConfigV2 = toml::from_str(content).unwrap();
1836 assert_eq!(config.version, 2);
1837 assert!(config.backend.is_some());
1838 assert!(config.rafs.is_none());
1839
1840 let backend = config.backend.as_ref().unwrap();
1841 assert_eq!(&backend.backend_type, "oss");
1842 assert!(backend.oss.is_some());
1843
1844 let oss = backend.oss.as_ref().unwrap();
1845 assert_eq!(&oss.endpoint, "my_endpoint");
1846 assert_eq!(&oss.bucket_name, "my_bucket_name");
1847 assert_eq!(&oss.object_prefix, "my_object_prefix");
1848 assert_eq!(&oss.access_key_id, "my_access_key_id");
1849 assert_eq!(&oss.access_key_secret, "my_access_key_secret");
1850 assert_eq!(&oss.scheme, "http");
1851 assert!(oss.skip_verify);
1852 assert_eq!(oss.timeout, 10);
1853 assert_eq!(oss.connect_timeout, 10);
1854 assert_eq!(oss.retry_limit, 5);
1855 assert_eq!(&oss.proxy.url, "localhost:6789");
1856 assert_eq!(&oss.proxy.ping_url, "localhost:6789/ping");
1857 assert_eq!(oss.proxy.check_interval, 10);
1858 assert!(oss.proxy.fallback);
1859 assert!(oss.proxy.use_http);
1860
1861 assert_eq!(oss.mirrors.len(), 1);
1862 let mirror = &oss.mirrors[0];
1863 assert_eq!(mirror.host, "http://127.0.0.1:65001");
1864 assert_eq!(mirror.ping_url, "http://127.0.0.1:65001/ping");
1865 assert!(mirror.headers.is_empty());
1866 assert_eq!(mirror.health_check_interval, 10);
1867 assert_eq!(mirror.failure_limit, 10);
1868 }
1869
1870 #[test]
1871 fn test_v2_backend_registry() {
1872 let content = r#"version=2
1873 [backend]
1874 type = "registry"
1875 [backend.registry]
1876 scheme = "http"
1877 host = "localhost"
1878 repo = "nydus"
1879 auth = "auth"
1880 skip_verify = true
1881 timeout = 10
1882 connect_timeout = 10
1883 retry_limit = 5
1884 registry_token = "bear_token"
1885 blob_url_scheme = "https"
1886 blob_redirected_host = "redirect.registry.com"
1887 [backend.registry.proxy]
1888 url = "localhost:6789"
1889 ping_url = "localhost:6789/ping"
1890 fallback = true
1891 check_interval = 10
1892 use_http = true
1893 [[backend.registry.mirrors]]
1894 host = "http://127.0.0.1:65001"
1895 ping_url = "http://127.0.0.1:65001/ping"
1896 health_check_interval = 10
1897 failure_limit = 10
1898 "#;
1899 let config: ConfigV2 = toml::from_str(content).unwrap();
1900 assert_eq!(config.version, 2);
1901 assert!(config.backend.is_some());
1902 assert!(config.rafs.is_none());
1903
1904 let backend = config.backend.as_ref().unwrap();
1905 assert_eq!(&backend.backend_type, "registry");
1906 assert!(backend.registry.is_some());
1907
1908 let registry = backend.registry.as_ref().unwrap();
1909 assert_eq!(®istry.scheme, "http");
1910 assert_eq!(®istry.host, "localhost");
1911 assert_eq!(®istry.repo, "nydus");
1912 assert_eq!(registry.auth.as_ref().unwrap(), "auth");
1913 assert!(registry.skip_verify);
1914 assert_eq!(registry.timeout, 10);
1915 assert_eq!(registry.connect_timeout, 10);
1916 assert_eq!(registry.retry_limit, 5);
1917 assert_eq!(registry.registry_token.as_ref().unwrap(), "bear_token");
1918 assert_eq!(registry.blob_url_scheme, "https");
1919 assert_eq!(registry.blob_redirected_host, "redirect.registry.com");
1920
1921 assert_eq!(®istry.proxy.url, "localhost:6789");
1922 assert_eq!(®istry.proxy.ping_url, "localhost:6789/ping");
1923 assert_eq!(registry.proxy.check_interval, 10);
1924 assert!(registry.proxy.fallback);
1925 assert!(registry.proxy.use_http);
1926
1927 assert_eq!(registry.mirrors.len(), 1);
1928 let mirror = ®istry.mirrors[0];
1929 assert_eq!(mirror.host, "http://127.0.0.1:65001");
1930 assert_eq!(mirror.ping_url, "http://127.0.0.1:65001/ping");
1931 assert!(mirror.headers.is_empty());
1932 assert_eq!(mirror.health_check_interval, 10);
1933 assert_eq!(mirror.failure_limit, 10);
1934 }
1935
1936 #[test]
1937 fn test_v2_cache() {
1938 let content = r#"version=2
1939 [cache]
1940 type = "filecache"
1941 compressed = true
1942 validate = true
1943 [cache.filecache]
1944 work_dir = "/tmp"
1945 [cache.fscache]
1946 work_dir = "./"
1947 [cache.prefetch]
1948 enable = true
1949 threads = 8
1950 batch_size = 1000000
1951 bandwidth_limit = 10000000
1952 "#;
1953 let config: ConfigV2 = toml::from_str(content).unwrap();
1954 assert_eq!(config.version, 2);
1955 assert!(config.backend.is_none());
1956 assert!(config.rafs.is_none());
1957 assert!(config.cache.is_some());
1958
1959 let cache = config.cache.as_ref().unwrap();
1960 assert_eq!(&cache.cache_type, "filecache");
1961 assert!(cache.cache_compressed);
1962 assert!(cache.cache_validate);
1963 let filecache = cache.file_cache.as_ref().unwrap();
1964 assert_eq!(&filecache.work_dir, "/tmp");
1965 let fscache = cache.fs_cache.as_ref().unwrap();
1966 assert_eq!(&fscache.work_dir, "./");
1967
1968 let prefetch = &cache.prefetch;
1969 assert!(prefetch.enable);
1970 assert_eq!(prefetch.threads, 8);
1971 assert_eq!(prefetch.batch_size, 1000000);
1972 assert_eq!(prefetch.bandwidth_limit, 10000000);
1973 }
1974
1975 #[test]
1976 fn test_v2_rafs() {
1977 let content = r#"version=2
1978 [rafs]
1979 mode = "direct"
1980 batch_size = 1000000
1981 validate = true
1982 enable_xattr = true
1983 iostats_files = true
1984 access_pattern = true
1985 latest_read_files = true
1986 [rafs.prefetch]
1987 enable = true
1988 threads = 4
1989 batch_size = 1000000
1990 bandwidth_limit = 10000000
1991 prefetch_all = true
1992 "#;
1993 let config: ConfigV2 = toml::from_str(content).unwrap();
1994 assert_eq!(config.version, 2);
1995 assert!(config.backend.is_none());
1996 assert!(config.cache.is_none());
1997 assert!(config.rafs.is_some());
1998
1999 let rafs = config.rafs.as_ref().unwrap();
2000 assert_eq!(&rafs.mode, "direct");
2001 assert_eq!(rafs.batch_size, 1000000);
2002 assert!(rafs.validate);
2003 assert!(rafs.enable_xattr);
2004 assert!(rafs.iostats_files);
2005 assert!(rafs.access_pattern);
2006 assert!(rafs.latest_read_files);
2007 assert!(rafs.prefetch.enable);
2008 assert_eq!(rafs.prefetch.threads, 4);
2009 assert_eq!(rafs.prefetch.batch_size, 1000000);
2010 assert_eq!(rafs.prefetch.bandwidth_limit, 10000000);
2011 assert!(rafs.prefetch.prefetch_all)
2012 }
2013
2014 #[test]
2015 fn test_v2_blob_cache_entry() {
2016 let content = r#"version=2
2017 id = "my_id"
2018 metadata_path = "meta_path"
2019 [backend]
2020 type = "localfs"
2021 [backend.localfs]
2022 blob_file = "/tmp/nydus.blob.data"
2023 dir = "/tmp"
2024 alt_dirs = ["/var/nydus/cache"]
2025 [cache]
2026 type = "filecache"
2027 compressed = true
2028 validate = true
2029 [cache.filecache]
2030 work_dir = "/tmp"
2031 "#;
2032 let config: BlobCacheEntryConfigV2 = toml::from_str(content).unwrap();
2033 assert_eq!(config.version, 2);
2034 assert_eq!(&config.id, "my_id");
2035 assert_eq!(config.metadata_path.as_ref().unwrap(), "meta_path");
2036
2037 let backend = &config.backend;
2038 assert_eq!(&backend.backend_type, "localfs");
2039 assert!(backend.localfs.is_some());
2040
2041 let localfs = backend.localfs.as_ref().unwrap();
2042 assert_eq!(&localfs.blob_file, "/tmp/nydus.blob.data");
2043 assert_eq!(&localfs.dir, "/tmp");
2044 assert_eq!(&localfs.alt_dirs[0], "/var/nydus/cache");
2045 }
2046
2047 #[test]
2048 fn test_sample_config_file() {
2049 let content = r#"{
2050 "device": {
2051 "backend": {
2052 "type": "localfs",
2053 "config": {
2054 "dir": "/tmp/AM7TxD/blobs",
2055 "readahead": true
2056 }
2057 },
2058 "cache": {
2059 "type": "blobcache",
2060 "compressed": true,
2061 "config": {
2062 "work_dir": "/tmp/AM7TxD/cache"
2063 }
2064 }
2065 },
2066 "mode": "cached",
2067 "digest_validate": true,
2068 "iostats_files": false
2069 }
2070 "#;
2071 let config = ConfigV2::from_str(content).unwrap();
2072 assert_eq!(&config.id, "");
2073 }
2074
2075 #[test]
2076 fn test_snapshotter_sample_config() {
2077 let content = r#"
2078 {
2079 "device": {
2080 "backend": {
2081 "type": "registry",
2082 "config": {
2083 "readahead": false,
2084 "host": "localhost",
2085 "repo": "vke/golang",
2086 "auth": "",
2087 "scheme": "https",
2088 "proxy": {
2089 "fallback": false
2090 },
2091 "timeout": 5,
2092 "connect_timeout": 5,
2093 "retry_limit": 2
2094 }
2095 },
2096 "cache": {
2097 "type": "blobcache",
2098 "compressed": true,
2099 "config": {
2100 "work_dir": "/var/lib/containerd-nydus/cache",
2101 "disable_indexed_map": false
2102 }
2103 }
2104 },
2105 "mode": "direct",
2106 "digest_validate": false,
2107 "enable_xattr": true,
2108 "fs_prefetch": {
2109 "enable": true,
2110 "prefetch_all": true,
2111 "threads_count": 8,
2112 "merging_size": 1048576,
2113 "bandwidth_rate": 0
2114 }
2115 }
2116 "#;
2117 let config = ConfigV2::from_str(content).unwrap();
2118 assert_eq!(&config.id, "");
2119 }
2120
2121 #[test]
2122 fn test_backend_http_proxy_config() {
2123 let config =
2124 r#"{"version":2,"backend":{"type":"http-proxy","http-proxy":{"addr":"/tmp"}}}"#;
2125 let config = ConfigV2::from_str(config).unwrap();
2126 let backend = config.backend.unwrap();
2127 assert_eq!(&backend.backend_type, "http-proxy");
2128 assert_eq!(&backend.http_proxy.unwrap().addr, "/tmp");
2129 }
2130
2131 #[test]
2132 fn test_new_localfs() {
2133 let config = ConfigV2::new_localfs("id1", "./").unwrap();
2134 assert_eq!(&config.id, "id1");
2135 assert_eq!(config.backend.as_ref().unwrap().backend_type, "localfs");
2136 }
2137
2138 #[test]
2139 fn test_update_registry_auth_info() {
2140 let config = r#"
2141 {
2142 "device": {
2143 "id": "test",
2144 "backend": {
2145 "type": "registry",
2146 "config": {
2147 "readahead": false,
2148 "host": "docker.io",
2149 "repo": "library/nginx",
2150 "scheme": "https",
2151 "proxy": {
2152 "fallback": false
2153 },
2154 "timeout": 5,
2155 "connect_timeout": 5,
2156 "retry_limit": 8
2157 }
2158 }
2159 },
2160 "mode": "direct",
2161 "digest_validate": false,
2162 "enable_xattr": true,
2163 "fs_prefetch": {
2164 "enable": true,
2165 "threads_count": 10,
2166 "merging_size": 131072,
2167 "bandwidth_rate": 10485760
2168 }
2169 }"#;
2170
2171 let mut rafs_config = ConfigV2::from_str(&config).unwrap();
2172 let test_auth = "test_auth".to_string();
2173
2174 rafs_config.update_registry_auth_info(&Some(test_auth.clone()));
2175
2176 let backend = rafs_config.backend.unwrap();
2177 let registry = backend.registry.unwrap();
2178 let auth = registry.auth.unwrap();
2179 assert_eq!(auth, test_auth);
2180 }
2181}