starlane_core/star/shell/
pledge.rs1use std::collections::HashSet;
2use std::iter::FromIterator;
3use std::str::FromStr;
4use std::sync::Arc;
5
6use futures::TryFutureExt;
7use rusqlite::{Connection, params, params_from_iter, ToSql};
8use rusqlite::types::{ToSqlOutput, Value, ValueRef};
9use tokio::sync::{mpsc, oneshot};
10use tokio::time::Duration;
11
12use starlane_resources::message::Fail;
13use starlane_resources::ResourceAssign;
14
15use crate::error::Error;
16use crate::resource::{
17 RemoteResourceHost, ResourceHost, ResourceLocationAffinity, ResourceType,
18};
19use crate::star::{StarCommand, StarConscriptKind, StarInfo, StarKey, StarKind, StarSkel};
20
21#[derive(Clone)]
22pub struct StarWranglerBacking {
23 tx: mpsc::Sender<StarHandleAction>,
24 star_tx: mpsc::Sender<StarCommand>,
25}
26
27impl StarWranglerBacking {
28 pub async fn new(star_tx: mpsc::Sender<StarCommand>) -> Self {
29 StarWranglerBacking {
30 tx: StarConscriptDB::new().await,
31 star_tx: star_tx,
32 }
33 }
34
35 pub async fn add_star_handle(&self, handle: StarConscript) -> Result<(), Error> {
36 let (action, rx) = StarHandleAction::new(StarConscriptCall::SetStar(handle));
37 self.tx.send(action).await?;
38 tokio::time::timeout(Duration::from_secs(5), rx).await??;
39 self.star_tx.send(StarCommand::CheckStatus).await;
40 Ok(())
41 }
42
43 pub async fn select(&self, selector: StarSelector) -> Result<Vec<StarConscript>, Error> {
44 let (action, rx) = StarHandleAction::new(StarConscriptCall::Select(selector));
45 self.tx.send(action).await?;
46 let result = tokio::time::timeout(Duration::from_secs(5), rx).await??;
47 match result {
48 StarConscriptResult::StarConscripts(handles) => Ok(handles),
49 _what => Err(Fail::expected("StarHandleResult::StarHandles(handles)").into()),
50 }
51 }
52
53 pub async fn next(&self, selector: StarSelector) -> Result<StarConscript, Error> {
54 let (action, rx) = StarHandleAction::new(StarConscriptCall::Next(selector));
55 self.tx.send(action).await?;
56 let result = tokio::time::timeout(Duration::from_secs(5), rx).await??;
57 match result {
58 StarConscriptResult::StarConscript(handle) => Ok(handle),
59 _what => Err(Fail::expected("StarHandleResult::StarHandle(handle)").into()),
60 }
61 }
62
63 pub async fn satisfied(&self, set: HashSet<StarConscriptKind>) -> Result<StarConscriptionSatisfaction, Error> {
65 let (action, rx) = StarHandleAction::new(StarConscriptCall::CheckSatisfaction(set));
66 self.tx.send(action).await?;
67 let result = tokio::time::timeout(Duration::from_secs(5), rx).await??;
68 match result {
69 StarConscriptResult::Satisfaction(satisfaction) => Ok(satisfaction),
70 _what => Err(Fail::expected("StarHandleResult::Satisfaction(_)").into()),
71 }
72 }
73}
74
75#[derive(Debug, Clone)]
76pub struct ResourceHostSelector {
77 skel: StarSkel,
78}
79
80impl ResourceHostSelector {
81 pub fn new(skel: StarSkel) -> Self {
82 ResourceHostSelector { skel: skel }
83 }
84
85 pub async fn select(&self, resource_type: ResourceType) -> Result<Arc<dyn ResourceHost>, Error> {
86 if StarKind::hosts(&resource_type) == self.skel.info.kind {
87 let handle = StarConscript {
88 key: self.skel.info.key.clone(),
89 kind: self.skel.info.kind.clone(),
90 hops: None,
91 };
92 let host = RemoteResourceHost {
93 skel: self.skel.clone(),
94 handle: handle,
95 };
96 Ok(Arc::new(host))
97 } else {
98 let handler = self.skel.star_handler.as_ref().ok_or(format!(
99 "non-manager star {} does not have a host star selector",
100 self.skel.info.kind.to_string()
101 ))?;
102 let mut selector = StarSelector::new();
103 selector.add(StarFieldSelection::Kind(StarKind::hosts(&resource_type)));
104 let handle = handler.next(selector).await?;
105
106 let host = RemoteResourceHost {
107 skel: self.skel.clone(),
108 handle: handle,
109 };
110
111 Ok(Arc::new(host))
112 }
113 }
114}
115
116pub struct StarConscript {
117 pub key: StarKey,
118 pub kind: StarKind,
119 pub hops: Option<usize>,
120}
121
122pub struct StarSelector {
123 fields: HashSet<StarFieldSelection>,
124}
125
126impl ToString for StarSelector {
127 fn to_string(&self) -> String {
128 let mut rtn = String::new();
129
130 for (index, field) in self.fields.iter().enumerate() {
131 if index > 0 {
132 rtn.push_str(", ");
133 }
134 rtn.push_str(field.to_string().as_str());
135 }
136
137 rtn
138 }
139}
140
141impl StarSelector {
142 pub fn new() -> Self {
143 StarSelector {
144 fields: HashSet::new(),
145 }
146 }
147 pub fn is_empty(&self) -> bool {
148 self.fields.is_empty()
149 }
150
151 pub fn add(&mut self, field: StarFieldSelection) {
152 self.fields.insert(field);
153 }
154}
155
156#[derive(Clone, Hash, Eq, PartialEq)]
157pub enum StarFieldSelection {
158 Kind(StarKind),
159 MinHops,
160}
161
162impl ToString for StarFieldSelection {
163 fn to_string(&self) -> String {
164 match self {
165 StarFieldSelection::Kind(kind) => format!("Kind:{}", kind.to_string()),
166 StarFieldSelection::MinHops => format!("MinHops"),
167 }
168 }
169}
170
171impl ToSql for StarFieldSelection {
172 fn to_sql(&self) -> Result<ToSqlOutput<'_>, rusqlite::Error> {
173 match self {
174 StarFieldSelection::Kind(kind) => Ok(ToSqlOutput::Owned(Value::Text(kind.to_string()))),
175 StarFieldSelection::MinHops => Ok(ToSqlOutput::Owned(Value::Null)),
176 }
177 }
178}
179
180impl StarFieldSelection {
181 pub fn is_param(&self) -> bool {
182 match self {
183 StarFieldSelection::Kind(_) => true,
184 StarFieldSelection::MinHops => false,
185 }
186 }
187}
188
189pub struct StarHandleAction {
190 pub command: StarConscriptCall,
191 pub tx: oneshot::Sender<StarConscriptResult>,
192}
193
194impl StarHandleAction {
195 pub fn new(command: StarConscriptCall) -> (Self, oneshot::Receiver<StarConscriptResult>) {
196 let (tx, rx) = oneshot::channel();
197 (
198 StarHandleAction {
199 tx: tx,
200 command: command,
201 },
202 rx,
203 )
204 }
205}
206
207#[derive(strum_macros::Display)]
208pub enum StarConscriptCall {
209 Close,
210 SetStar(StarConscript),
211 Select(StarSelector),
212 Next(StarSelector),
213 CheckSatisfaction(HashSet<StarConscriptKind>),
214}
215
216#[derive(strum_macros::Display)]
217pub enum StarConscriptResult {
218 Ok,
219 StarConscripts(Vec<StarConscript>),
220 StarConscript(StarConscript),
221 Fail(Fail),
222 Satisfaction(StarConscriptionSatisfaction),
223}
224
225#[derive(Eq, PartialEq, Debug)]
226pub enum StarConscriptionSatisfaction {
227 Ok,
228 Lacking(HashSet<StarKind>),
229}
230
231pub struct StarConscriptDB {
232 pub conn: Connection,
233 pub rx: mpsc::Receiver<StarHandleAction>,
234}
235
236impl StarConscriptDB {
237 pub async fn new() -> mpsc::Sender<StarHandleAction> {
238 let (tx, rx) = mpsc::channel(8 * 1024);
239
240 tokio::spawn(async move {
241 let conn = Connection::open_in_memory();
242 if conn.is_ok() {
243 let mut db = StarConscriptDB {
244 conn: conn.unwrap(),
245 rx: rx,
246 };
247 db.run().await.unwrap()
248 }
249 });
250 tx
251 }
252
253 async fn run(&mut self) -> Result<(), Error> {
254 self.setup()?;
255
256 while let Option::Some(request) = self.rx.recv().await {
257 if let StarConscriptCall::Close = request.command {
258 break;
259 }
260 match self.process(request.command).await {
261 Ok(ok) => {
262 request.tx.send(ok);
263 }
264 Err(fail) => {
265 eprintln!("{}", fail.to_string());
266 request.tx.send(StarConscriptResult::Fail(fail.into()));
267 }
268 }
269 }
270 Ok(())
271 }
272
273 async fn process(&mut self, command: StarConscriptCall) -> Result<StarConscriptResult, Error> {
274 match command {
275 StarConscriptCall::Close => {
276 Ok(StarConscriptResult::Ok)
278 }
279 StarConscriptCall::SetStar(handle) => {
280 let key = handle.key.bin()?;
281 let kind = handle.kind.to_string();
282
283 let trans = self.conn.transaction()?;
284 if handle.hops.is_some() {
285 trans.execute(
286 "REPLACE INTO stars (key,kind,hops) VALUES (?1,?2,?3)",
287 params![key, kind, handle.hops],
288 )?;
289 } else {
290 trans.execute(
291 "REPLACE INTO stars (key,kind) VALUES (?1,?2)",
292 params![key, kind],
293 )?;
294 }
295 trans.commit()?;
296
297 Ok(StarConscriptResult::Ok)
298 }
299 StarConscriptCall::Select(selector) => {
300 let mut params = vec![];
301 let mut where_clause = String::new();
302 let mut param_index = 0;
303
304 for (index, field) in Vec::from_iter(selector.fields.clone())
305 .iter()
306 .map(|x| x.clone())
307 .enumerate()
308 {
309 if index != 0 {
310 where_clause.push_str(" AND ");
311 }
312
313 let f = match &field {
314 StarFieldSelection::Kind(_kind) => {
315 format!("kind=?{}", index + 1)
316 }
317 StarFieldSelection::MinHops => {
318 format!("hops NOT NULL AND hops=MIN(hops)")
319 }
320 };
321
322 where_clause.push_str(f.as_str());
323 if field.is_param() {
324 params.push(field);
325 param_index = param_index + 1;
326 }
327 }
328
329 let statement = if !selector.is_empty() {
331 format!(
332 "SELECT DISTINCT key,kind,hops FROM stars WHERE {}",
333 where_clause
334 )
335 } else {
336 "SELECT DISTINCT key,kind,hops FROM stars".to_string()
337 };
338
339 let mut statement = self.conn.prepare(statement.as_str())?;
340 let mut rows = statement.query(params_from_iter(params.iter()))?;
341
342 let mut handles = vec![];
343 while let Option::Some(row) = rows.next()? {
344 let key: Vec<u8> = row.get(0)?;
345 let key = StarKey::from_bin(key)?;
346
347 let kind: String = row.get(1)?;
348 let kind = StarKind::from_str(kind.as_str())?;
349
350 let hops = if let ValueRef::Null = row.get_ref(2)? {
351 Option::None
352 } else {
353 let hops: usize = row.get(2)?;
354 Option::Some(hops)
355 };
356
357 let handle = StarConscript {
358 key: key,
359 kind: kind,
360 hops: hops,
361 };
362
363 handles.push(handle);
364 }
365 Ok(StarConscriptResult::StarConscripts(handles))
366 }
367 StarConscriptCall::Next(selector) => {
368 let mut params = vec![];
369 let mut where_clause = String::new();
370 let mut param_index = 0;
371
372 for (index, field) in Vec::from_iter(selector.fields.clone())
373 .iter()
374 .map(|x| x.clone())
375 .enumerate()
376 {
377 if index != 0 {
378 where_clause.push_str(" AND ");
379 }
380
381 let f = match &field {
382 StarFieldSelection::Kind(_kind) => {
383 format!("kind=?{}", index + 1)
384 }
385 StarFieldSelection::MinHops => {
386 format!("hops NOT NULL AND hops=MIN(hops)")
387 }
388 };
389
390 where_clause.push_str(f.as_str());
391 if field.is_param() {
392 params.push(field);
393 param_index = param_index + 1;
394 }
395 }
396
397 let statement = if !selector.is_empty() {
399 format!(
400 "SELECT DISTINCT key,kind,hops FROM stars WHERE {} ORDER BY selections",
401 where_clause
402 )
403 } else {
404 "SELECT DISTINCT key,kind,hops FROM stars ORDER BY selections".to_string()
405 };
406
407 let trans = self.conn.transaction()?;
408
409 let handle =
410 trans.query_row(statement.as_str(), params_from_iter(params.iter()), |row| {
411 let key: Vec<u8> = row.get(0)?;
412 let key = StarKey::from_bin(key)?;
413
414 let kind: String = row.get(1)?;
415 let kind = StarKind::from_str(kind.as_str())
416 .map_err(|_| rusqlite::Error::InvalidQuery)?;
417
418 let hops = if let ValueRef::Null = row.get_ref(2)? {
419 Option::None
420 } else {
421 let hops: usize = row.get(2)?;
422 Option::Some(hops)
423 };
424
425 let handle = StarConscript {
426 key: key,
427 kind: kind,
428 hops: hops,
429 };
430
431 Ok(handle)
432 });
433
434 let handle = match handle {
435 Ok(handle) => handle,
436 Err(err) => {
437 match err {
438 rusqlite::Error::QueryReturnedNoRows => {
439 return Err(Fail::SuitableHostNotAvailable(format!(
440 "could not select for: {}",
441 selector.to_string()
442 )).into());
443 }
444 _ => {
445 return Err(err.to_string().into());
446 }
447 };
448 }
449 };
450
451 trans.execute(
452 "UPDATE stars SET selections=selections+1 WHERE key=?1",
453 params![handle.key.bin()?],
454 )?;
455
456 trans.commit()?;
457
458 Ok(StarConscriptResult::StarConscript(handle))
459 }
460
461 StarConscriptCall::CheckSatisfaction(mut kinds) => {
462 let mut lacking = HashSet::new();
463 kinds.retain( |c| c.required );
464 let kinds:Vec<StarKind> = kinds.iter().map(|c|c.kind.clone()).collect();
465
466 for kind in kinds {
467 if !self.conn.query_row(
468 "SELECT count(*) AS count FROM stars WHERE kind=?1",
469 params![kind.to_string()],
470 |row| {
471 let count: usize = row.get(0)?;
472 return Ok(count > 0);
473 },
474 )? {
475 lacking.insert(kind);
476 }
477 }
478 if lacking.is_empty() {
479 Ok(StarConscriptResult::Satisfaction(StarConscriptionSatisfaction::Ok))
480 } else {
481 Ok(StarConscriptResult::Satisfaction(StarConscriptionSatisfaction::Lacking(
482 lacking,
483 )))
484 }
485 }
486 }
487 }
488
489 pub fn setup(&mut self) -> Result<(), Error> {
490 let stars = r#"
491 CREATE TABLE IF NOT EXISTS stars(
492 key BLOB PRIMARY KEY,
493 kind TEXT NOT NULL,
494 hops INTEGER,
495 selections INTEGER NOT NULL DEFAULT 0
496 )"#;
497
498 let transaction = self.conn.transaction()?;
499 transaction.execute(stars, [])?;
500 transaction.commit();
501
502 Ok(())
503 }
504}