container_registry/
test_support.rs1use std::{net::SocketAddr, sync::Arc, thread};
28
29use axum::{body::Body, routing::RouterIntoService};
30use tokio::runtime::Runtime;
31use tower_http::trace::TraceLayer;
32
33use super::{
34 auth::{self, Permissions},
35 ContainerRegistry, ContainerRegistryBuilder,
36};
37
38pub struct TestingContainerRegistry {
40 pub registry: Arc<ContainerRegistry>,
42 pub temp_storage: Option<tempdir::TempDir>,
44 pub body_limit: usize,
46 pub bind_addr: SocketAddr,
48}
49
50pub struct RunningRegistry {
54 bound_addr: SocketAddr,
55 join_handle: Option<thread::JoinHandle<()>>,
56 _temp_storage: Option<tempdir::TempDir>,
57 shutdown: Option<tokio::sync::mpsc::Sender<()>>,
58}
59
60impl RunningRegistry {
61 pub fn bound_addr(&self) -> SocketAddr {
63 self.bound_addr
64 }
65}
66
67impl Drop for RunningRegistry {
68 fn drop(&mut self) {
69 drop(self.shutdown.take());
71
72 if let Some(join_handle) = self.join_handle.take() {
74 join_handle.join().expect("failed to join");
75 }
76
77 }
79}
80
81impl TestingContainerRegistry {
82 pub fn make_service(&self) -> RouterIntoService<Body> {
84 self.registry
85 .clone()
86 .make_router()
87 .layer(TraceLayer::new_for_http())
88 .into_service::<Body>()
89 }
90
91 pub fn bind(&mut self, addr: SocketAddr) -> &mut Self {
93 self.bind_addr = addr;
94 self
95 }
96
97 pub fn body_limit(&mut self, body_limit: usize) -> &mut Self {
99 self.body_limit = body_limit;
100 self
101 }
102
103 pub fn run_in_background(mut self) -> RunningRegistry {
108 let app = axum::Router::new()
109 .merge(self.registry.clone().make_router())
110 .layer(axum::extract::DefaultBodyLimit::max(self.body_limit));
111
112 let listener =
113 std::net::TcpListener::bind(self.bind_addr).expect("could not bind listener");
114 listener
115 .set_nonblocking(true)
116 .expect("could not set listener to non-blocking");
117 let bound_addr = listener.local_addr().expect("failed to get local address");
118
119 let (shutdown_sender, mut shutdown_receiver) = tokio::sync::mpsc::channel::<()>(1);
120 let rt = Runtime::new().expect("could not create tokio runtime");
121 let join_handle = thread::spawn(move || {
122 rt.block_on(async move {
123 let listener = tokio::net::TcpListener::from_std(listener)
124 .expect("could not create tokio listener");
125
126 axum::serve(listener, app)
127 .with_graceful_shutdown(async move {
128 shutdown_receiver.recv().await;
129 })
130 .await
131 .expect("axum io error");
132 })
133 });
134
135 RunningRegistry {
136 bound_addr,
137 join_handle: Some(join_handle),
138 shutdown: Some(shutdown_sender),
139 _temp_storage: self.temp_storage.take(),
140 }
141 }
142
143 pub fn registry(&self) -> &ContainerRegistry {
145 &self.registry
146 }
147}
148
149impl ContainerRegistryBuilder {
150 pub fn build_for_testing(mut self) -> TestingContainerRegistry {
163 let temp_storage = if self.storage.is_none() {
164 let temp_storage = tempdir::TempDir::new("container-registry-for-testing").expect(
165 "could not create temporary directory to host testing container registry instance",
166 );
167 self = self.storage(temp_storage.path());
168 Some(temp_storage)
169 } else {
170 None
171 };
172
173 if self.auth_provider.is_none() {
174 self = self.auth_provider(Arc::new(auth::Anonymous::new(
175 Permissions::ReadWrite,
176 Permissions::ReadWrite,
177 )));
178 }
179
180 let registry = self.build().expect("could not create registry");
181
182 TestingContainerRegistry {
183 registry,
184 temp_storage,
185 bind_addr: ([127, 0, 0, 1], 0).into(),
186 body_limit: 100 * 1024 * 1024,
187 }
188 }
189}