1#![allow(unused_imports)]
2use std::sync::LazyLock;
3use tank::{
4 AsValue, Entity, Error, Executor, QueryBuilder, Result, Value, cols, expr,
5 stream::{StreamExt, TryStreamExt},
6};
7use tokio::sync::Mutex;
8
9static MUTEX: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
10
11#[derive(Clone, Debug, PartialEq, Eq)]
12pub struct HostPort {
13 pub host: String,
14 pub port: u16,
15}
16
17impl HostPort {
18 pub fn new(host: impl Into<String>, port: u16) -> Self {
19 Self {
20 host: host.into(),
21 port,
22 }
23 }
24}
25
26impl AsValue for HostPort {
27 fn as_empty_value() -> Value {
28 Value::Varchar(None)
29 }
30
31 fn as_value(self) -> Value {
32 Value::Varchar(Some(format!("{}:{}", self.host, self.port).into()))
33 }
34
35 fn try_from_value(value: Value) -> Result<Self>
36 where
37 Self: Sized,
38 {
39 if let Value::Varchar(Some(v), ..) = value.try_as(&Value::Varchar(None))? {
40 let (host, port) = v
41 .split_once(':')
42 .ok_or_else(|| Error::msg(format!("Invalid HostPort `{v}`")))?;
43
44 return Ok(Self {
45 host: host.to_string(),
46 port: port
47 .parse::<u16>()
48 .map_err(|_| Error::msg(format!("Invalid port in HostPort `{v}`")))?,
49 });
50 }
51 Err(Error::msg("Unexpected value for HostPort"))
52 }
53}
54
55#[derive(Entity, Clone, Debug, PartialEq, Eq)]
56#[tank(schema = "ops")]
57#[tank(primary_key = (name, addr))]
58struct Service {
59 name: String,
60 #[tank(clustering_key)]
61 addr: HostPort,
62 backup_addr: Option<HostPort>,
63}
64
65pub async fn service(executor: &mut impl Executor) {
66 let _lock = MUTEX.lock().await;
67
68 Service::drop_table(executor, true, false)
70 .await
71 .expect("Failed to drop Service");
72 Service::create_table(executor, false, true)
73 .await
74 .expect("Failed to create Service");
75
76 let mut api = Service {
78 addr: HostPort::new("api.internal", 443),
79 name: "api".into(),
80 backup_addr: None,
81 };
82 api.save(executor).await.expect("Failed to save api");
83 let loaded = Service::find_one(executor, api.primary_key_expr())
84 .await
85 .expect("Failed to load api")
86 .expect("Missing api");
87 assert_eq!(
88 loaded,
89 Service {
90 addr: HostPort::new("api.internal", 443),
91 name: "api".into(),
92 backup_addr: None,
93 }
94 );
95
96 api.backup_addr = Some(HostPort::new("api.internal", 8443));
97 api.save(executor)
98 .await
99 .expect("Failed to update api backup");
100 let loaded = Service::find_one(executor, api.primary_key_expr())
101 .await
102 .expect("Failed to reload api")
103 .expect("Missing api after update");
104 assert_eq!(
105 loaded,
106 Service {
107 addr: HostPort::new("api.internal", 443),
108 name: "api".into(),
109 backup_addr: HostPort::new("api.internal", 8443).into(),
110 }
111 );
112
113 let mut query = Service::prepare_find(executor, expr!(Service::name == ?), None)
114 .await
115 .expect("Failed to prepare query by name");
116 query.bind("api").expect("Failed to bind name parameter");
117
118 let api_by_addr = executor
119 .fetch(query)
120 .map_ok(Service::from_row)
121 .map(Result::flatten)
122 .try_collect::<Vec<_>>()
123 .await
124 .expect("Failed to query by addr");
125 assert_eq!(
126 api_by_addr,
127 [Service {
128 addr: HostPort::new("api.internal", 443),
129 name: "api".into(),
130 backup_addr: HostPort::new("api.internal", 8443).into(),
131 }]
132 );
133
134 let api_canary = Service {
135 addr: HostPort::new("api.internal", 8444),
136 name: "api".into(),
137 backup_addr: None,
138 };
139 Service::insert_one(executor, &api_canary)
140 .await
141 .expect("Failed to insert api canary");
142
143 let web = Service {
144 addr: HostPort::new("web.internal", 80),
145 name: "web".into(),
146 backup_addr: Some(HostPort::new("web.internal", 8080)),
147 };
148 Service::insert_one(executor, &web)
149 .await
150 .expect("Failed to insert web");
151 #[cfg(not(feature = "disable-ordering"))]
152 {
153 let rows = executor
154 .fetch(
155 QueryBuilder::new()
156 .select(Service::columns())
157 .from(Service::table())
158 .where_expr(expr!(Service::name == "api"))
159 .order_by(cols!(Service::addr ASC))
160 .build(&executor.driver()),
161 )
162 .map_ok(Service::from_row)
163 .map(Result::flatten)
164 .try_collect::<Vec<_>>()
165 .await
166 .expect("Failed to select ordered services");
167
168 assert_eq!(
169 rows,
170 [
171 Service {
172 addr: HostPort::new("api.internal", 443),
173 name: "api".into(),
174 backup_addr: HostPort::new("api.internal", 8443).into(),
175 },
176 Service {
177 addr: HostPort::new("api.internal", 8444),
178 name: "api".into(),
179 backup_addr: None,
180 },
181 ]
182 );
183 }
184}