starlane_core/star/shell/
pledge.rs

1use std::collections::HashSet;
2use std::iter::FromIterator;
3use std::str::FromStr;
4use std::sync::Arc;
5
6use futures::TryFutureExt;
7use rusqlite::{Connection, params, params_from_iter, ToSql};
8use rusqlite::types::{ToSqlOutput, Value, ValueRef};
9use tokio::sync::{mpsc, oneshot};
10use tokio::time::Duration;
11
12use starlane_resources::message::Fail;
13use starlane_resources::ResourceAssign;
14
15use crate::error::Error;
16use crate::resource::{
17    RemoteResourceHost, ResourceHost, ResourceLocationAffinity, ResourceType,
18};
19use crate::star::{StarCommand, StarConscriptKind, StarInfo, StarKey, StarKind, StarSkel};
20
21#[derive(Clone)]
22pub struct StarWranglerBacking {
23    tx: mpsc::Sender<StarHandleAction>,
24    star_tx: mpsc::Sender<StarCommand>,
25}
26
27impl StarWranglerBacking {
28    pub async fn new(star_tx: mpsc::Sender<StarCommand>) -> Self {
29        StarWranglerBacking {
30            tx: StarConscriptDB::new().await,
31            star_tx: star_tx,
32        }
33    }
34
35    pub async fn add_star_handle(&self, handle: StarConscript) -> Result<(), Error> {
36        let (action, rx) = StarHandleAction::new(StarConscriptCall::SetStar(handle));
37        self.tx.send(action).await?;
38        tokio::time::timeout(Duration::from_secs(5), rx).await??;
39        self.star_tx.send(StarCommand::CheckStatus).await;
40        Ok(())
41    }
42
43    pub async fn select(&self, selector: StarSelector) -> Result<Vec<StarConscript>, Error> {
44        let (action, rx) = StarHandleAction::new(StarConscriptCall::Select(selector));
45        self.tx.send(action).await?;
46        let result = tokio::time::timeout(Duration::from_secs(5), rx).await??;
47        match result {
48            StarConscriptResult::StarConscripts(handles) => Ok(handles),
49            _what => Err(Fail::expected("StarHandleResult::StarHandles(handles)").into()),
50        }
51    }
52
53    pub async fn next(&self, selector: StarSelector) -> Result<StarConscript, Error> {
54        let (action, rx) = StarHandleAction::new(StarConscriptCall::Next(selector));
55        self.tx.send(action).await?;
56        let result = tokio::time::timeout(Duration::from_secs(5), rx).await??;
57        match result {
58            StarConscriptResult::StarConscript(handle) => Ok(handle),
59            _what => Err(Fail::expected("StarHandleResult::StarHandle(handle)").into()),
60        }
61    }
62
63    // must have at least one of each StarKind
64    pub async fn satisfied(&self, set: HashSet<StarConscriptKind>) -> Result<StarConscriptionSatisfaction, Error> {
65        let (action, rx) = StarHandleAction::new(StarConscriptCall::CheckSatisfaction(set));
66        self.tx.send(action).await?;
67        let result = tokio::time::timeout(Duration::from_secs(5), rx).await??;
68        match result {
69            StarConscriptResult::Satisfaction(satisfaction) => Ok(satisfaction),
70            _what => Err(Fail::expected("StarHandleResult::Satisfaction(_)").into()),
71        }
72    }
73}
74
75#[derive(Debug, Clone)]
76pub struct ResourceHostSelector {
77    skel: StarSkel,
78}
79
80impl ResourceHostSelector {
81    pub fn new(skel: StarSkel) -> Self {
82        ResourceHostSelector { skel: skel }
83    }
84
85    pub async fn select(&self, resource_type: ResourceType) -> Result<Arc<dyn ResourceHost>, Error> {
86        if StarKind::hosts(&resource_type) == self.skel.info.kind {
87            let handle = StarConscript {
88                key: self.skel.info.key.clone(),
89                kind: self.skel.info.kind.clone(),
90                hops: None,
91            };
92            let host = RemoteResourceHost {
93                skel: self.skel.clone(),
94                handle: handle,
95            };
96            Ok(Arc::new(host))
97        } else {
98            let handler = self.skel.star_handler.as_ref().ok_or(format!(
99                "non-manager star {} does not have a host star selector",
100                self.skel.info.kind.to_string()
101            ))?;
102            let mut selector = StarSelector::new();
103            selector.add(StarFieldSelection::Kind(StarKind::hosts(&resource_type)));
104            let handle = handler.next(selector).await?;
105
106            let host = RemoteResourceHost {
107                skel: self.skel.clone(),
108                handle: handle,
109            };
110
111            Ok(Arc::new(host))
112        }
113    }
114}
115
116pub struct StarConscript {
117    pub key: StarKey,
118    pub kind: StarKind,
119    pub hops: Option<usize>,
120}
121
122pub struct StarSelector {
123    fields: HashSet<StarFieldSelection>,
124}
125
126impl ToString for StarSelector {
127    fn to_string(&self) -> String {
128        let mut rtn = String::new();
129
130        for (index, field) in self.fields.iter().enumerate() {
131            if index > 0 {
132                rtn.push_str(", ");
133            }
134            rtn.push_str(field.to_string().as_str());
135        }
136
137        rtn
138    }
139}
140
141impl StarSelector {
142    pub fn new() -> Self {
143        StarSelector {
144            fields: HashSet::new(),
145        }
146    }
147    pub fn is_empty(&self) -> bool {
148        self.fields.is_empty()
149    }
150
151    pub fn add(&mut self, field: StarFieldSelection) {
152        self.fields.insert(field);
153    }
154}
155
156#[derive(Clone, Hash, Eq, PartialEq)]
157pub enum StarFieldSelection {
158    Kind(StarKind),
159    MinHops,
160}
161
162impl ToString for StarFieldSelection {
163    fn to_string(&self) -> String {
164        match self {
165            StarFieldSelection::Kind(kind) => format!("Kind:{}", kind.to_string()),
166            StarFieldSelection::MinHops => format!("MinHops"),
167        }
168    }
169}
170
171impl ToSql for StarFieldSelection {
172    fn to_sql(&self) -> Result<ToSqlOutput<'_>, rusqlite::Error> {
173        match self {
174            StarFieldSelection::Kind(kind) => Ok(ToSqlOutput::Owned(Value::Text(kind.to_string()))),
175            StarFieldSelection::MinHops => Ok(ToSqlOutput::Owned(Value::Null)),
176        }
177    }
178}
179
180impl StarFieldSelection {
181    pub fn is_param(&self) -> bool {
182        match self {
183            StarFieldSelection::Kind(_) => true,
184            StarFieldSelection::MinHops => false,
185        }
186    }
187}
188
189pub struct StarHandleAction {
190    pub command: StarConscriptCall,
191    pub tx: oneshot::Sender<StarConscriptResult>,
192}
193
194impl StarHandleAction {
195    pub fn new(command: StarConscriptCall) -> (Self, oneshot::Receiver<StarConscriptResult>) {
196        let (tx, rx) = oneshot::channel();
197        (
198            StarHandleAction {
199                tx: tx,
200                command: command,
201            },
202            rx,
203        )
204    }
205}
206
207#[derive(strum_macros::Display)]
208pub enum StarConscriptCall {
209    Close,
210    SetStar(StarConscript),
211    Select(StarSelector),
212    Next(StarSelector),
213    CheckSatisfaction(HashSet<StarConscriptKind>),
214}
215
216#[derive(strum_macros::Display)]
217pub enum StarConscriptResult {
218    Ok,
219    StarConscripts(Vec<StarConscript>),
220    StarConscript(StarConscript),
221    Fail(Fail),
222    Satisfaction(StarConscriptionSatisfaction),
223}
224
225#[derive(Eq, PartialEq, Debug)]
226pub enum StarConscriptionSatisfaction {
227    Ok,
228    Lacking(HashSet<StarKind>),
229}
230
231pub struct StarConscriptDB {
232    pub conn: Connection,
233    pub rx: mpsc::Receiver<StarHandleAction>,
234}
235
236impl StarConscriptDB {
237    pub async fn new() -> mpsc::Sender<StarHandleAction> {
238        let (tx, rx) = mpsc::channel(8 * 1024);
239
240        tokio::spawn(async move {
241            let conn = Connection::open_in_memory();
242            if conn.is_ok() {
243                let mut db = StarConscriptDB {
244                    conn: conn.unwrap(),
245                    rx: rx,
246                };
247                db.run().await.unwrap()
248            }
249        });
250        tx
251    }
252
253    async fn run(&mut self) -> Result<(), Error> {
254        self.setup()?;
255
256        while let Option::Some(request) = self.rx.recv().await {
257            if let StarConscriptCall::Close = request.command {
258                break;
259            }
260            match self.process(request.command).await {
261                Ok(ok) => {
262                    request.tx.send(ok);
263                }
264                Err(fail) => {
265                    eprintln!("{}", fail.to_string());
266                    request.tx.send(StarConscriptResult::Fail(fail.into()));
267                }
268            }
269        }
270        Ok(())
271    }
272
273    async fn process(&mut self, command: StarConscriptCall) -> Result<StarConscriptResult, Error> {
274        match command {
275            StarConscriptCall::Close => {
276                // this is handle in the run() method
277                Ok(StarConscriptResult::Ok)
278            }
279            StarConscriptCall::SetStar(handle) => {
280                let key = handle.key.bin()?;
281                let kind = handle.kind.to_string();
282
283                let trans = self.conn.transaction()?;
284                if handle.hops.is_some() {
285                    trans.execute(
286                        "REPLACE INTO stars (key,kind,hops) VALUES (?1,?2,?3)",
287                        params![key, kind, handle.hops],
288                    )?;
289                } else {
290                    trans.execute(
291                        "REPLACE INTO stars (key,kind) VALUES (?1,?2)",
292                        params![key, kind],
293                    )?;
294                }
295                trans.commit()?;
296
297                Ok(StarConscriptResult::Ok)
298            }
299            StarConscriptCall::Select(selector) => {
300                let mut params = vec![];
301                let mut where_clause = String::new();
302                let mut param_index = 0;
303
304                for (index, field) in Vec::from_iter(selector.fields.clone())
305                    .iter()
306                    .map(|x| x.clone())
307                    .enumerate()
308                {
309                    if index != 0 {
310                        where_clause.push_str(" AND ");
311                    }
312
313                    let f = match &field {
314                        StarFieldSelection::Kind(_kind) => {
315                            format!("kind=?{}", index + 1)
316                        }
317                        StarFieldSelection::MinHops => {
318                            format!("hops NOT NULL AND hops=MIN(hops)")
319                        }
320                    };
321
322                    where_clause.push_str(f.as_str());
323                    if field.is_param() {
324                        params.push(field);
325                        param_index = param_index + 1;
326                    }
327                }
328
329                // in case this search was for EVERYTHING
330                let statement = if !selector.is_empty() {
331                    format!(
332                        "SELECT DISTINCT key,kind,hops  FROM stars WHERE {}",
333                        where_clause
334                    )
335                } else {
336                    "SELECT DISTINCT key,kind,hops  FROM stars".to_string()
337                };
338
339                let mut statement = self.conn.prepare(statement.as_str())?;
340                let mut rows = statement.query(params_from_iter(params.iter()))?;
341
342                let mut handles = vec![];
343                while let Option::Some(row) = rows.next()? {
344                    let key: Vec<u8> = row.get(0)?;
345                    let key = StarKey::from_bin(key)?;
346
347                    let kind: String = row.get(1)?;
348                    let kind = StarKind::from_str(kind.as_str())?;
349
350                    let hops = if let ValueRef::Null = row.get_ref(2)? {
351                        Option::None
352                    } else {
353                        let hops: usize = row.get(2)?;
354                        Option::Some(hops)
355                    };
356
357                    let handle = StarConscript {
358                        key: key,
359                        kind: kind,
360                        hops: hops,
361                    };
362
363                    handles.push(handle);
364                }
365                Ok(StarConscriptResult::StarConscripts(handles))
366            }
367            StarConscriptCall::Next(selector) => {
368                let mut params = vec![];
369                let mut where_clause = String::new();
370                let mut param_index = 0;
371
372                for (index, field) in Vec::from_iter(selector.fields.clone())
373                    .iter()
374                    .map(|x| x.clone())
375                    .enumerate()
376                {
377                    if index != 0 {
378                        where_clause.push_str(" AND ");
379                    }
380
381                    let f = match &field {
382                        StarFieldSelection::Kind(_kind) => {
383                            format!("kind=?{}", index + 1)
384                        }
385                        StarFieldSelection::MinHops => {
386                            format!("hops NOT NULL AND hops=MIN(hops)")
387                        }
388                    };
389
390                    where_clause.push_str(f.as_str());
391                    if field.is_param() {
392                        params.push(field);
393                        param_index = param_index + 1;
394                    }
395                }
396
397                // in case this search was for EVERYTHING
398                let statement = if !selector.is_empty() {
399                    format!(
400                        "SELECT DISTINCT key,kind,hops  FROM stars WHERE {} ORDER BY selections",
401                        where_clause
402                    )
403                } else {
404                    "SELECT DISTINCT key,kind,hops  FROM stars ORDER BY selections".to_string()
405                };
406
407                let trans = self.conn.transaction()?;
408
409                let handle =
410                    trans.query_row(statement.as_str(), params_from_iter(params.iter()), |row| {
411                        let key: Vec<u8> = row.get(0)?;
412                        let key = StarKey::from_bin(key)?;
413
414                        let kind: String = row.get(1)?;
415                        let kind = StarKind::from_str(kind.as_str())
416                            .map_err(|_| rusqlite::Error::InvalidQuery)?;
417
418                        let hops = if let ValueRef::Null = row.get_ref(2)? {
419                            Option::None
420                        } else {
421                            let hops: usize = row.get(2)?;
422                            Option::Some(hops)
423                        };
424
425                        let handle = StarConscript {
426                            key: key,
427                            kind: kind,
428                            hops: hops,
429                        };
430
431                        Ok(handle)
432                    });
433
434                let handle = match handle {
435                    Ok(handle) => handle,
436                    Err(err) => {
437                        match err {
438                            rusqlite::Error::QueryReturnedNoRows => {
439                                return Err(Fail::SuitableHostNotAvailable(format!(
440                                    "could not select for: {}",
441                                    selector.to_string()
442                                )).into());
443                            }
444                            _ => {
445                                return Err(err.to_string().into());
446                            }
447                        };
448                    }
449                };
450
451                trans.execute(
452                    "UPDATE stars SET selections=selections+1 WHERE key=?1",
453                    params![handle.key.bin()?],
454                )?;
455
456                trans.commit()?;
457
458                Ok(StarConscriptResult::StarConscript(handle))
459            }
460
461            StarConscriptCall::CheckSatisfaction(mut kinds) => {
462                let mut lacking = HashSet::new();
463                kinds.retain( |c| c.required );
464                let kinds:Vec<StarKind> = kinds.iter().map(|c|c.kind.clone()).collect();
465
466                for kind in kinds {
467                    if !self.conn.query_row(
468                        "SELECT count(*) AS count FROM stars WHERE kind=?1",
469                        params![kind.to_string()],
470                        |row| {
471                            let count: usize = row.get(0)?;
472                            return Ok(count > 0);
473                        },
474                    )? {
475                        lacking.insert(kind);
476                    }
477                }
478                if lacking.is_empty() {
479                    Ok(StarConscriptResult::Satisfaction(StarConscriptionSatisfaction::Ok))
480                } else {
481                    Ok(StarConscriptResult::Satisfaction(StarConscriptionSatisfaction::Lacking(
482                        lacking,
483                    )))
484                }
485            }
486        }
487    }
488
489    pub fn setup(&mut self) -> Result<(), Error> {
490        let stars = r#"
491       CREATE TABLE IF NOT EXISTS stars(
492	      key BLOB PRIMARY KEY,
493	      kind TEXT NOT NULL,
494	      hops INTEGER,
495	      selections INTEGER NOT NULL DEFAULT 0
496        )"#;
497
498        let transaction = self.conn.transaction()?;
499        transaction.execute(stars, [])?;
500        transaction.commit();
501
502        Ok(())
503    }
504}