1use std::collections::HashMap;
13use std::time::Duration;
14use versions::Versioning;
15
16use crate::config::{Config, ContainerConfig};
17use crate::container::Container;
18use crate::error::TestError;
19use crate::image::Image;
20use crate::port::{Port, PortAccess};
21use crate::probe::{MessageProbe, MessageSource};
22use crate::service::Service;
23use crate::services::flex::dynamic::DynamicFlexConfig;
24
25const FLEX_LOCAL_BASE: &str = "/usr/local/share";
26const FLEX_CONFIG_BASE: &str = "mulesoft/flex-gateway/conf.d";
27const FLEX_SCHEMA: &str = "http";
28
29pub const FLEX_IMAGE_NAME: &str = "mulesoft/flex-gateway";
31
32mod api;
33mod dynamic;
34mod gcl;
35mod policy;
36
37#[cfg(feature = "experimental")]
38mod policy_binding;
39#[cfg(feature = "experimental")]
40mod service;
41
42pub use api::{ApiConfig, ApiConfigBuilder};
43pub use policy::{PolicyConfig, PolicyConfigBuilder};
44
45#[cfg(feature = "experimental")]
46pub use policy_binding::{
47 PolicyBindingConfig, PolicyBindingConfigBuilder, Target, TargetBuilder, TargetKind,
48};
49#[cfg(feature = "experimental")]
50pub use service::{UpstreamServiceConfig, UpstreamServiceConfigBuilder};
51
52#[derive(Debug, Clone)]
54pub struct FlexConfig {
55 hostname: String,
56 image_name: String,
57 version: String,
58 config_mounts: Vec<(String, String)>,
59 ports: Vec<Port>,
60 timeout: Duration,
61 dynamic_config: DynamicFlexConfig,
62}
63
64impl FlexConfig {
65 pub fn hostname(&self) -> &str {
67 &self.hostname
68 }
69
70 pub fn version(&self) -> &str {
72 &self.version
73 }
74
75 pub fn image_name(&self) -> &str {
77 &self.image_name
78 }
79
80 pub fn config_mounts(&self) -> &[(String, String)] {
82 &self.config_mounts
83 }
84
85 pub fn ports(&self) -> &[Port] {
87 &self.ports
88 }
89
90 pub fn timeout(&self) -> Duration {
92 self.timeout
93 }
94
95 pub fn new() -> Self {
97 Self {
98 hostname: "local-flex".to_string(),
99 version: std::env::var("PDK_TEST_FLEX_IMAGE_VERSION")
100 .unwrap_or_else(|_| "latest".to_string()),
101 image_name: std::env::var("PDK_TEST_FLEX_IMAGE_NAME")
102 .unwrap_or_else(|_| FLEX_IMAGE_NAME.to_string()),
103 config_mounts: vec![],
104 ports: vec![],
105 timeout: Duration::from_secs(60),
106 dynamic_config: DynamicFlexConfig::new(),
107 }
108 }
109
110 pub fn builder() -> FlexConfigBuilder {
112 FlexConfigBuilder::new()
113 }
114}
115
116impl Default for FlexConfig {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122#[derive(Debug)]
124pub struct FlexConfigBuilder {
125 config: FlexConfig,
126}
127
128impl FlexConfigBuilder {
129 fn new() -> Self {
130 Self {
131 config: FlexConfig::new(),
132 }
133 }
134
135 pub fn hostname<T: Into<String>>(self, hostname: T) -> Self {
138 Self {
139 config: FlexConfig {
140 hostname: hostname.into(),
141 ..self.config
142 },
143 }
144 }
145
146 pub fn version<T: Into<String>>(self, version: T) -> Self {
149 Self {
150 config: FlexConfig {
151 version: version.into(),
152 ..self.config
153 },
154 }
155 }
156
157 pub fn image_name<T: Into<String>>(self, image_name: T) -> Self {
160 Self {
161 config: FlexConfig {
162 image_name: image_name.into(),
163 ..self.config
164 },
165 }
166 }
167
168 pub fn timeout(self, timeout: Duration) -> Self {
170 Self {
171 config: FlexConfig {
172 timeout,
173 ..self.config
174 },
175 }
176 }
177
178 pub fn config_mounts<T, S, D>(self, config_mounts: T) -> Self
180 where
181 T: IntoIterator<Item = (S, D)>,
182 S: Into<String>,
183 D: Into<String>,
184 {
185 Self {
186 config: FlexConfig {
187 config_mounts: config_mounts
188 .into_iter()
189 .map(|(s, d)| (s.into(), d.into()))
190 .collect(),
191 ..self.config
192 },
193 }
194 }
195
196 pub fn ports<T>(self, ports: T) -> Self
198 where
199 T: IntoIterator<Item = Port>,
200 {
201 Self {
202 config: FlexConfig {
203 ports: ports.into_iter().collect(),
204 ..self.config
205 },
206 }
207 }
208
209 pub fn with_api(self, api: ApiConfig) -> Self {
210 let mut ports = self.config.ports;
211 ports.push(api.port);
212 Self {
213 config: FlexConfig {
214 ports,
215 dynamic_config: self.config.dynamic_config.with_api(api),
216 ..self.config
217 },
218 }
219 }
220
221 #[cfg(feature = "experimental")]
222 pub fn with_upstream_service(self, service: UpstreamServiceConfig) -> Self {
223 Self {
224 config: FlexConfig {
225 dynamic_config: self.config.dynamic_config.with_upstream_service(service),
226 ..self.config
227 },
228 }
229 }
230
231 #[cfg(feature = "experimental")]
232 pub fn with_policy_binding(self, policy_binding: PolicyBindingConfig) -> Self {
233 Self {
234 config: FlexConfig {
235 dynamic_config: self
236 .config
237 .dynamic_config
238 .with_policy_bindings(policy_binding),
239 ..self.config
240 },
241 }
242 }
243
244 pub fn build(self) -> FlexConfig {
246 self.config
247 }
248}
249
250fn readiness(version: &str, timeout: Duration) -> Result<MessageProbe, TestError> {
251 let versioning = Versioning::new(version)
252 .ok_or_else(|| TestError::Startup(format!("Unable to parse Flex version `{version}`.")))?;
253 let times = if version == "latest" || versioning >= Versioning::new("1.7.0").unwrap() {
254 1
255 } else if versioning >= Versioning::new("1.4.0").unwrap() {
256 2
257 } else {
258 1
259 };
260
261 Ok(MessageProbe::builder("cds: added/updated")
262 .times(times)
263 .timeout(timeout)
264 .source(MessageSource::StdOut)
265 .build())
266}
267
268impl Config for FlexConfig {
269 fn hostname(&self) -> &str {
270 &self.hostname
271 }
272
273 fn port(&self) -> Port {
274 self.ports.first().cloned().unwrap_or_default()
275 }
276
277 fn schema(&self) -> &str {
278 FLEX_SCHEMA
279 }
280
281 fn to_container_config(&self) -> Result<ContainerConfig, TestError> {
282 let config_mounts = self.config_mounts.iter();
283 let dynamic = self.dynamic_config.dirs()?;
284
285 for (host, flex) in dynamic.iter() {
286 log::info!(
287 "Applying Flex config mount host={host} -> container={FLEX_LOCAL_BASE}/{FLEX_CONFIG_BASE}/{flex}"
288 );
289 }
290
291 let mounts = config_mounts.chain(dynamic.iter()).map(|(host, flex)| {
292 (
293 host.clone(),
294 FLEX_LOCAL_BASE.to_string(),
295 format!("{FLEX_CONFIG_BASE}/{flex}"),
296 )
297 });
298 let ports = self.ports.iter().map(|&m| PortAccess::published(m));
299
300 Ok(ContainerConfig::builder(
301 self.hostname.clone(),
302 Image::from_repository(&self.image_name).with_version(&self.version),
303 )
304 .ports(ports)
305 .mounts(mounts)
306 .readiness(readiness(&self.version, self.timeout)?)
307 .build())
308 }
309}
310
311#[derive(Default, Clone)]
313pub struct Flex {
314 sockets: HashMap<Port, String>,
315}
316
317impl Flex {
318 pub fn external_url(&self, port: Port) -> Option<String> {
321 self.sockets
322 .get(&port)
323 .map(|socket| format!("{FLEX_SCHEMA}://{socket}"))
324 }
325}
326
327impl Service for Flex {
328 type Config = FlexConfig;
329
330 fn new(_config: &Self::Config, container: &Container) -> Self {
331 Self {
332 sockets: container.sockets().clone(),
333 }
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use std::time::Duration;
340
341 use crate::error::TestError;
342
343 use super::readiness;
344
345 #[test]
346 fn readiness_for_1_3_0() -> Result<(), TestError> {
347 let readiness = readiness("1.3.0", Duration::from_secs(1))?;
348 assert_eq!(readiness.times(), 1);
349
350 Ok(())
351 }
352
353 #[test]
354 fn readiness_for_1_6_0() -> Result<(), TestError> {
355 let readiness = readiness("1.6.0", Duration::from_secs(1))?;
356 assert_eq!(readiness.times(), 2);
357
358 Ok(())
359 }
360
361 #[test]
362 fn readiness_for_1_7_0() -> Result<(), TestError> {
363 let readiness = readiness("1.7.0", Duration::from_secs(1))?;
364 assert_eq!(readiness.times(), 1);
365
366 Ok(())
367 }
368
369 #[test]
370 fn readiness_for_latest() -> Result<(), TestError> {
371 let readiness = readiness("latest", Duration::from_secs(1))?;
372 assert_eq!(readiness.times(), 1);
373
374 Ok(())
375 }
376}