Skip to main content

tank_tests/
service.rs

1#![allow(unused_imports)]
2use std::sync::LazyLock;
3use tank::{
4    AsValue, Entity, Error, Executor, QueryBuilder, Result, Value, cols, expr,
5    stream::{StreamExt, TryStreamExt},
6};
7use tokio::sync::Mutex;
8
9static MUTEX: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
10
11#[derive(Clone, Debug, PartialEq, Eq)]
12pub struct HostPort {
13    pub host: String,
14    pub port: u16,
15}
16
17impl HostPort {
18    pub fn new(host: impl Into<String>, port: u16) -> Self {
19        Self {
20            host: host.into(),
21            port,
22        }
23    }
24}
25
26impl AsValue for HostPort {
27    fn as_empty_value() -> Value {
28        Value::Varchar(None)
29    }
30
31    fn as_value(self) -> Value {
32        Value::Varchar(Some(format!("{}:{}", self.host, self.port).into()))
33    }
34
35    fn try_from_value(value: Value) -> Result<Self>
36    where
37        Self: Sized,
38    {
39        if let Value::Varchar(Some(v), ..) = value.try_as(&Value::Varchar(None))? {
40            let (host, port) = v
41                .split_once(':')
42                .ok_or_else(|| Error::msg(format!("Invalid HostPort `{v}`")))?;
43
44            return Ok(Self {
45                host: host.to_string(),
46                port: port
47                    .parse::<u16>()
48                    .map_err(|_| Error::msg(format!("Invalid port in HostPort `{v}`")))?,
49            });
50        }
51        Err(Error::msg("Unexpected value for HostPort"))
52    }
53}
54
55#[derive(Entity, Clone, Debug, PartialEq, Eq)]
56#[tank(schema = "ops")]
57#[tank(primary_key = (name, addr))]
58struct Service {
59    name: String,
60    #[tank(clustering_key)]
61    addr: HostPort,
62    backup_addr: Option<HostPort>,
63}
64
65pub async fn service(executor: &mut impl Executor) {
66    let _lock = MUTEX.lock().await;
67
68    // Setup
69    Service::drop_table(executor, true, false)
70        .await
71        .expect("Failed to drop Service");
72    Service::create_table(executor, false, true)
73        .await
74        .expect("Failed to create Service");
75
76    // Query
77    let mut api = Service {
78        addr: HostPort::new("api.internal", 443),
79        name: "api".into(),
80        backup_addr: None,
81    };
82    api.save(executor).await.expect("Failed to save api");
83    let loaded = Service::find_one(executor, api.primary_key_expr())
84        .await
85        .expect("Failed to load api")
86        .expect("Missing api");
87    assert_eq!(
88        loaded,
89        Service {
90            addr: HostPort::new("api.internal", 443),
91            name: "api".into(),
92            backup_addr: None,
93        }
94    );
95
96    api.backup_addr = Some(HostPort::new("api.internal", 8443));
97    api.save(executor)
98        .await
99        .expect("Failed to update api backup");
100    let loaded = Service::find_one(executor, api.primary_key_expr())
101        .await
102        .expect("Failed to reload api")
103        .expect("Missing api after update");
104    assert_eq!(
105        loaded,
106        Service {
107            addr: HostPort::new("api.internal", 443),
108            name: "api".into(),
109            backup_addr: HostPort::new("api.internal", 8443).into(),
110        }
111    );
112
113    let mut query = Service::prepare_find(executor, expr!(Service::name == ?), None)
114        .await
115        .expect("Failed to prepare query by name");
116    query.bind("api").expect("Failed to bind name parameter");
117
118    let api_by_addr = executor
119        .fetch(query)
120        .map_ok(Service::from_row)
121        .map(Result::flatten)
122        .try_collect::<Vec<_>>()
123        .await
124        .expect("Failed to query by addr");
125    assert_eq!(
126        api_by_addr,
127        [Service {
128            addr: HostPort::new("api.internal", 443),
129            name: "api".into(),
130            backup_addr: HostPort::new("api.internal", 8443).into(),
131        }]
132    );
133
134    let api_canary = Service {
135        addr: HostPort::new("api.internal", 8444),
136        name: "api".into(),
137        backup_addr: None,
138    };
139    Service::insert_one(executor, &api_canary)
140        .await
141        .expect("Failed to insert api canary");
142
143    let web = Service {
144        addr: HostPort::new("web.internal", 80),
145        name: "web".into(),
146        backup_addr: Some(HostPort::new("web.internal", 8080)),
147    };
148    Service::insert_one(executor, &web)
149        .await
150        .expect("Failed to insert web");
151    #[cfg(not(feature = "disable-ordering"))]
152    {
153        let rows = executor
154            .fetch(
155                QueryBuilder::new()
156                    .select(Service::columns())
157                    .from(Service::table())
158                    .where_expr(expr!(Service::name == "api"))
159                    .order_by(cols!(Service::addr ASC))
160                    .build(&executor.driver()),
161            )
162            .map_ok(Service::from_row)
163            .map(Result::flatten)
164            .try_collect::<Vec<_>>()
165            .await
166            .expect("Failed to select ordered services");
167
168        assert_eq!(
169            rows,
170            [
171                Service {
172                    addr: HostPort::new("api.internal", 443),
173                    name: "api".into(),
174                    backup_addr: HostPort::new("api.internal", 8443).into(),
175                },
176                Service {
177                    addr: HostPort::new("api.internal", 8444),
178                    name: "api".into(),
179                    backup_addr: None,
180                },
181            ]
182        );
183    }
184}