1use std::collections::{HashMap, HashSet};
2use std::convert::{TryFrom, TryInto};
3use std::fmt;
4use std::fmt::{Debug, Formatter};
5use std::fs::DirBuilder;
6use std::hash::Hash;
7use std::iter::FromIterator;
8use std::str::FromStr;
9use std::sync::Arc;
10use std::time::Duration;
11
12use rusqlite::{Connection, params, params_from_iter, Row, ToSql, Transaction};
13use rusqlite::types::{ToSqlOutput, Value, ValueRef};
14use serde::{Deserialize, Serialize};
15use tokio::sync::{mpsc, oneshot};
16use tokio::sync::oneshot::Receiver;
17
18use starlane_resources::{AddressCreationSrc, AssignKind, AssignResourceStateSrc, FieldSelection, KeyCreationSrc, LabelSelection, MetaSelector, ResourceArchetype, ResourceAssign, ResourceCreate, ResourceIdentifier, ResourceRegistryInfo, ResourceSelector, ResourceStub, Unique, Names, ResourcePath, ResourceCreateStrategy, ResourceAction};
19use starlane_resources::ConfigSrc;
20use starlane_resources::data::{BinSrc, DataSet};
21use starlane_resources::message::{Fail, MessageFrom, MessageReply, MessageTo, ProtoMessage, ResourceRequestMessage, ResourceResponseMessage};
22
23use crate::{error, logger, util};
24use crate::error::Error;
25use crate::file_access::FileAccess;
26use crate::frame::{Reply, ReplyKind, ResourceHostAction, SimpleReply, StarMessagePayload};
27use crate::logger::{elog, LogInfo, StaticLogInfo};
28use crate::message::{MessageExpect, ProtoStarMessage};
29use crate::names::Name;
30use crate::star::{ResourceRegistryBacking, StarInfo, StarKey, StarSkel};
31use crate::star::shell::pledge::{ResourceHostSelector, StarConscript};
32use crate::starlane::api::StarlaneApi;
33use crate::util::AsyncHashMap;
34use std::collections::hash_map::RandomState;
35
36pub mod artifact;
37pub mod config;
38pub mod create_args;
39pub mod file;
40pub mod file_system;
41pub mod selector;
42pub mod user;
43
44pub type ResourceType = starlane_resources::ResourceType;
45pub type ResourceAddress = starlane_resources::ResourceAddress;
46pub type Path = starlane_resources::Path;
47pub type DomainCase = starlane_resources::DomainCase;
48pub type SkwerCase = starlane_resources::SkewerCase;
49
50pub type ResourceKind = starlane_resources::ResourceKind;
51pub type DatabaseKind = starlane_resources::DatabaseKind;
52pub type FileKind = starlane_resources::FileKind;
53pub type ArtifactKind = starlane_resources::ArtifactKind;
54
55pub type ResourceKey = starlane_resources::ResourceKey;
56pub type ResourceAddressPart = starlane_resources::ResourcePathSegment;
57pub type ResourceAddressPartKind = starlane_resources::ResourcePathSegmentKind;
58
59pub type RootKey = starlane_resources::RootKey;
60pub type SpaceKey = starlane_resources::SpaceKey;
61pub type AppKey = starlane_resources::AppKey;
62pub type DatabaseKey = starlane_resources::DatabaseKey;
63pub type ActorKey = starlane_resources::MechtronKey;
64pub type ProxyKey = starlane_resources::ProxyKey;
65pub type UserKey = starlane_resources::UserKey;
66pub type ArtifactKey = starlane_resources::ArtifactKey;
67pub type FileSystemKey = starlane_resources::FileSystemKey;
68pub type FileKey = starlane_resources::FileKey;
69pub type Resource = starlane_resources::Resource;
70
71pub type ResourceId = starlane_resources::ResourceId;
72
73pub type ArtifactBundleKey = starlane_resources::ArtifactBundleKey;
74
75pub type Specific = starlane_resources::Specific;
76
77static RESOURCE_QUERY_FIELDS: &str = "r.key,r.address,r.kind,r.specific,r.owner,r.config,r.host";
79
80impl ToSql for Name {
81 fn to_sql(&self) -> Result<ToSqlOutput<'_>, rusqlite::Error> {
82 Ok(ToSqlOutput::Owned(Value::Text(self.to())))
83 }
84}
85
86pub struct ResourceRegistryAction {
87 pub tx: oneshot::Sender<ResourceRegistryResult>,
88 pub command: ResourceRegistryCommand,
89}
90
91impl ResourceRegistryAction {
92 pub fn new(
93 command: ResourceRegistryCommand,
94 ) -> (Self, oneshot::Receiver<ResourceRegistryResult>) {
95 let (tx, rx) = oneshot::channel();
96 (
97 ResourceRegistryAction {
98 tx: tx,
99 command: command,
100 },
101 rx,
102 )
103 }
104}
105
106pub enum ResourceRegistryCommand {
107 Close,
108 Clear,
109 Reserve(ResourceNamesReservationRequest),
111 Commit(ResourceRegistration),
112 Select(ResourceSelector),
113 SetLocation(ResourceRecord),
114 Get(ResourceIdentifier),
115 Next { key: ResourceKey, unique: Unique },
116}
117
118pub enum ResourceRegistryResult {
119 Ok,
120 Error(String),
121 Resource(Option<ResourceRecord>),
122 Resources(Vec<ResourceRecord>),
123 Address(ResourceAddress),
124 Reservation(RegistryReservation),
125 Key(ResourceKey),
126 Unique(u64),
127 NotFound,
128 NotAccepted,
129}
130
131impl ToString for ResourceRegistryResult {
132 fn to_string(&self) -> String {
133 match self {
134 ResourceRegistryResult::Ok => "Ok".to_string(),
135 ResourceRegistryResult::Error(err) => format!("Error({})", err),
136 ResourceRegistryResult::Resource(_) => "Resource".to_string(),
137 ResourceRegistryResult::Resources(_) => "Resources".to_string(),
138 ResourceRegistryResult::Address(_) => "Address".to_string(),
139 ResourceRegistryResult::Reservation(_) => "Reservation".to_string(),
140 ResourceRegistryResult::Key(_) => "Key".to_string(),
141 ResourceRegistryResult::Unique(_) => "Unique".to_string(),
142 ResourceRegistryResult::NotFound => "NotFound".to_string(),
143 ResourceRegistryResult::NotAccepted => "NotAccepted".to_string(),
144 }
145 }
146}
147
148type Blob = Vec<u8>;
149
150struct RegistryParams {
151 key: Option<Blob>,
152 address: Option<String>,
153 resource_type: String,
154 kind: String,
155 specific: Option<String>,
156 config: Option<String>,
157 owner: Option<Blob>,
158 host: Option<Blob>,
159 parent: Option<Blob>,
160}
161
162impl RegistryParams {
163 pub fn from_registration(registration: ResourceRegistration) -> Result<Self, Error> {
164 Self::new(
165 registration.resource.stub.archetype,
166 registration.resource.stub.key.parent(),
167 Option::Some(registration.resource.stub.key),
168 registration.resource.stub.owner,
169 Option::Some(registration.resource.stub.address),
170 Option::Some(registration.resource.location.star),
171 )
172 }
173
174 pub fn from_archetype(
175 archetype: ResourceArchetype,
176 parent: Option<ResourceKey>,
177 ) -> Result<Self, Error> {
178 Self::new(
179 archetype,
180 parent,
181 Option::None,
182 Option::None,
183 Option::None,
184 Option::None,
185 )
186 }
187
188 pub fn new(
189 archetype: ResourceArchetype,
190 parent: Option<ResourceKey>,
191 key: Option<ResourceKey>,
192 owner: Option<UserKey>,
193 address: Option<ResourcePath>,
194 host: Option<StarKey>,
195 ) -> Result<Self, Error> {
196 let key = if let Option::Some(key) = key {
197 Option::Some(key.bin()?)
198 } else {
199 Option::None
200 };
201
202 let address = if let Option::Some(address) = address {
203 Option::Some(address.to_string())
204 } else {
205 Option::None
206 };
207
208 let resource_type = archetype.kind.resource_type().to_string();
209 let kind = archetype.kind.to_string();
210
211 let owner = if let Option::Some(owner) = owner {
212 Option::Some(owner.bin()?)
213 } else {
214 Option::None
215 };
216
217 let specific = match &archetype.specific {
218 None => Option::None,
219 Some(specific) => Option::Some(specific.to_string()),
220 };
221
222 let config = match &archetype.config {
223 None => Option::None,
224 Some(config) => Option::Some(config.to_string()),
225 };
226
227 let parent = match parent {
228 None => Option::None,
229 Some(parent) => Option::Some(parent.bin()?),
230 };
231
232 let host = match host {
233 Some(host) => Option::Some(host.bin()?),
234 None => Option::None,
235 };
236
237 Ok(RegistryParams {
238 key: key,
239 address: address,
240 resource_type: resource_type,
241 kind: kind,
242 specific: specific,
243 parent: parent,
244 config: config,
245 owner: owner,
246 host: host,
247 })
248 }
249}
250
251pub struct Registry {
252 pub conn: Connection,
253 pub tx: mpsc::Sender<ResourceRegistryAction>,
254 pub rx: mpsc::Receiver<ResourceRegistryAction>,
255 star_info: StarInfo,
256}
257
258impl Registry {
259 pub async fn new(star_info: StarInfo, path: String) -> mpsc::Sender<ResourceRegistryAction> {
260 let (tx, rx) = mpsc::channel(8 * 1024);
261 let tx_clone = tx.clone();
262
263 let mut dir_builder = DirBuilder::new();
265 dir_builder.recursive(true);
266 if let Result::Err(_) = dir_builder.create(path.clone()) {
267 eprintln!("FATAL: could not create star data directory: {}", path);
268 return tx;
269 }
270 tokio::spawn(async move {
271 let conn = Connection::open_in_memory();
273 if conn.is_ok() {
274 let mut db = Registry {
275 conn: conn.unwrap(),
276 tx: tx_clone,
277 rx: rx,
278 star_info: star_info,
279 };
280 db.run().await.unwrap();
281 } else {
282 let log_info = StaticLogInfo::new(
283 "ResourceRegistry".to_string(),
284 star_info.log_kind().to_string(),
285 star_info.key.to_string(),
286 );
287 eprintln!("connection ERROR!");
288 logger::elog(
289 &log_info,
290 &star_info,
291 "new()",
292 format!(
293 "ERROR: could not create SqLite connection to database: '{}'",
294 conn.err().unwrap().to_string(),
295 )
296 .as_str(),
297 );
298 }
299 });
300 tx
301 }
302
303 async fn run(&mut self) -> Result<(), Error> {
304 match self.setup() {
305 Ok(_) => {}
306 Err(err) => {
307 eprintln!("error setting up db: {}", err);
308 return Err(err);
309 }
310 };
311
312 while let Option::Some(request) = self.rx.recv().await {
313 if let ResourceRegistryCommand::Close = request.command {
314 break;
315 }
316 match self.process(request.command) {
317 Ok(ok) => {
318 request.tx.send(ok);
319 }
320 Err(err) => {
321 eprintln!("{}", err);
322 request
323 .tx
324 .send(ResourceRegistryResult::Error(err.to_string()));
325 }
326 }
327 }
328
329 Ok(())
330 }
331
332 fn process(
333 &mut self,
334 command: ResourceRegistryCommand,
335 ) -> Result<ResourceRegistryResult, Error> {
336 match command {
337 ResourceRegistryCommand::Close => Ok(ResourceRegistryResult::Ok),
338 ResourceRegistryCommand::Clear => {
339 let trans = self.conn.transaction()?;
340 trans.execute("DELETE FROM labels", [])?;
341 trans.execute("DELETE FROM names", [])?;
342 trans.execute("DELETE FROM resources", [])?;
343 trans.execute("DELETE FROM uniques", [])?;
344 trans.commit()?;
345
346 Ok(ResourceRegistryResult::Ok)
347 }
348
349 ResourceRegistryCommand::Commit(registration) => {
350 let params = RegistryParams::from_registration(registration.clone())?;
351
352 let trans = self.conn.transaction()?;
353
354 if params.key.is_some() {
355 trans.execute(
356 "DELETE FROM labels WHERE labels.resource_key=?1",
357 [params.key.clone()],
358 );
359 trans.execute("DELETE FROM resources WHERE key=?1", [params.key.clone()])?;
360 }
361
362 trans.execute("INSERT INTO resources (key,address,resource_type,kind,specific,parent,owner,config,host) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9)", params![params.key,params.address,params.resource_type,params.kind,params.specific,params.parent,params.owner,params.config,params.host])?;
363 if let Option::Some(info) = registration.info {
364 for name in info.names {
365 trans.execute("UPDATE names SET key=?1 WHERE name=?1", [name])?;
366 }
367 for (name, value) in info.labels {
368 trans.execute(
369 "INSERT INTO labels (resource_key,name,value) VALUES (?1,?2,?3)",
370 params![params.key, name, value],
371 )?;
372 }
373 }
374
375 trans.commit()?;
376 Ok(ResourceRegistryResult::Ok)
377 }
378 ResourceRegistryCommand::Select(selector) => {
379 let mut params: Vec<FieldSelectionSql> = vec![];
380 let mut where_clause = String::new();
381
382 for (index, field) in Vec::from_iter(selector.fields.clone())
383 .iter()
384 .map(|x| x.clone())
385 .enumerate()
386 {
387 if index != 0 {
388 where_clause.push_str(" AND ");
389 }
390
391 let f = match field {
392 FieldSelection::Identifier(_) => {
393 format!("r.key=?{}", index + 1)
394 }
395 FieldSelection::Type(_) => {
396 format!("r.resource_type=?{}", index + 1)
397 }
398 FieldSelection::Kind(_) => {
399 format!("r.kind=?{}", index + 1)
400 }
401 FieldSelection::Specific(_) => {
402 format!("r.specific=?{}", index + 1)
403 }
404 FieldSelection::Owner(_) => {
405 format!("r.owner=?{}", index + 1)
406 }
407 FieldSelection::Parent(_) => {
408 format!("r.parent=?{}", index + 1)
409 }
410 };
411 where_clause.push_str(f.as_str());
412 params.push(field.into());
413 }
414
415 let mut statement = match &selector.meta {
425 MetaSelector::None => {
426 format!(
427 "SELECT DISTINCT {} FROM resources as r WHERE {}",
428 RESOURCE_QUERY_FIELDS, where_clause
429 )
430 }
431 MetaSelector::Label(label_selector) => {
432 let mut labels = String::new();
433 for (_index, label_selection) in
434 Vec::from_iter(label_selector.labels.clone())
435 .iter()
436 .map(|x| x.clone())
437 .enumerate()
438 {
439 if let LabelSelection::Exact(label) = label_selection {
440 labels.push_str(format!(" AND {} IN (SELECT labels.resource_key FROM labels WHERE labels.name='{}' AND labels.value='{}')", RESOURCE_QUERY_FIELDS, label.name, label.value).as_str())
441 }
442 }
443
444 format!(
445 "SELECT DISTINCT {} FROM resources as r WHERE {} {}",
446 RESOURCE_QUERY_FIELDS, where_clause, labels
447 )
448 }
449 MetaSelector::Name(name) => {
450 if where_clause.is_empty() {
451 format!(
452 "SELECT DISTINCT {} FROM names as r WHERE r.name='{}'",
453 RESOURCE_QUERY_FIELDS, name
454 )
455 } else {
456 format!(
457 "SELECT DISTINCT {} FROM names as r WHERE {} AND r.name='{}'",
458 RESOURCE_QUERY_FIELDS, where_clause, name
459 )
460 }
461 }
462 };
463
464 if selector.is_empty() {
466 statement = format!(
467 "SELECT DISTINCT {} FROM resources as r",
468 RESOURCE_QUERY_FIELDS
469 )
470 .to_string();
471 }
472
473 let mut statement = self.conn.prepare(statement.as_str())?;
474 let mut rows = statement.query(params_from_iter(params.iter()))?;
475
476 let mut resources = vec![];
477 while let Option::Some(row) = rows.next()? {
478 resources.push(Self::process_resource_row_catch(row)?);
479 }
480 Ok(ResourceRegistryResult::Resources(resources))
481 }
482 ResourceRegistryCommand::SetLocation(location_record) => {
483 let key = location_record.stub.key.bin()?;
484 let host = location_record.location.star.bin()?;
485 let trans = self.conn.transaction()?;
486 trans.execute(
487 "UPDATE resources SET host=?1 WHERE key=?3",
488 params![host, key],
489 )?;
490 trans.commit()?;
491 Ok(ResourceRegistryResult::Ok)
492 }
493 ResourceRegistryCommand::Get(identifier) => {
494
495 if identifier.is_root() {
496 return Ok(ResourceRegistryResult::Resource(Option::Some(
497 ResourceRecord::root(),
498 )));
499 }
500
501 let result = match &identifier {
502 ResourceIdentifier::Key(key) => {
503 let key = key.bin()?;
504 let statement = format!(
505 "SELECT {} FROM resources as r WHERE key=?1",
506 RESOURCE_QUERY_FIELDS
507 );
508 let mut statement = self.conn.prepare(statement.as_str())?;
509 statement.query_row(params![key], |row| {
510 Ok(Self::process_resource_row_catch(row)?)
511 })
512 }
513 ResourceIdentifier::Address(address) => {
514 let address = address.to_string();
515 let statement = format!(
516 "SELECT {} FROM resources as r WHERE address=?1",
517 RESOURCE_QUERY_FIELDS
518 );
519 let mut statement = self.conn.prepare(statement.as_str())?;
520 statement.query_row(params![address], |row| {
521 Ok(Self::process_resource_row_catch(row)?)
522 })
523 }
524 };
525
526 match result {
527 Ok(record) => Ok(ResourceRegistryResult::Resource(Option::Some(record))),
528 Err(rusqlite::Error::QueryReturnedNoRows) => {
529 Ok(ResourceRegistryResult::Resource(Option::None))
530 }
531 Err(err) => match err {
532 rusqlite::Error::QueryReturnedNoRows => {
533 Ok(ResourceRegistryResult::Resource(Option::None))
534 }
535 err => {
536 eprintln!(
537 "for {} SQL ERROR: {}",
538 identifier.to_string(),
539 err.to_string()
540 );
541 Err(err.into())
542 }
543 },
544 }
545 }
546
547 ResourceRegistryCommand::Reserve(request) => {
548 let trans = self.conn.transaction()?;
549 trans.execute("DELETE FROM names WHERE key IS NULL AND datetime(reservation_timestamp) < datetime('now')", [] )?;
550 let params = RegistryParams::new(
551 request.archetype.clone(),
552 Option::Some(request.parent.clone()),
553 Option::None,
554 Option::None,
555 Option::None,
556 Option::None,
557 )?;
558 if request.info.is_some() {
559 let params = RegistryParams::from_archetype(
560 request.archetype.clone(),
561 Option::Some(request.parent.clone()),
562 )?;
563 Self::process_names(
564 &trans,
565 &request.info.as_ref().cloned().unwrap().names,
566 ¶ms,
567 )?;
568 }
569 trans.commit()?;
570 let (tx, rx) = oneshot::channel();
571 let reservation = RegistryReservation::new(tx);
572 let action_tx = self.tx.clone();
573 let info = request.info.clone();
574 tokio::spawn(async move {
575 let result = rx.await;
576 if let Result::Ok((record, result_tx)) = result {
577 let mut params = params;
578 let key = match record.stub.key.bin() {
579 Ok(key) => Option::Some(key),
580 Err(_) => Option::None,
581 };
582
583 params.key = key;
584 params.address = Option::Some(record.stub.address.to_string());
585 let registration = ResourceRegistration::new(record.clone(), info);
586 let (action, rx) = ResourceRegistryAction::new(
587 ResourceRegistryCommand::Commit(registration),
588 );
589 action_tx.send(action).await;
590 rx.await;
591 result_tx.send(Ok(()));
592 } else if let Result::Err(error) = result {
593 error!(
594 "ERROR: reservation failed to commit due to RecvErr: '{}'",
595 error.to_string()
596 );
597 } else {
598 error!("ERROR: reservation failed to commit.");
599 }
600 });
601 Ok(ResourceRegistryResult::Reservation(reservation))
602 }
603
604 ResourceRegistryCommand::Next { key, unique } => {
605 let trans = self.conn.transaction()?;
606 let key = key.bin()?;
607 let column = match unique {
608 Unique::Sequence => "sequence",
609 Unique::Index => "id_index",
610 };
611
612 trans.execute(
613 "INSERT OR IGNORE INTO uniques (key) VALUES (?1)",
614 params![key],
615 )?;
616 trans.execute(
617 format!("UPDATE uniques SET {}={}+1 WHERE key=?1", column, column).as_str(),
618 params![key],
619 )?;
620 let rtn = trans.query_row(
621 format!("SELECT {} FROM uniques WHERE key=?1", column).as_str(),
622 params![key],
623 |r| {
624 let rtn: u64 = r.get(0)?;
625 Ok(rtn)
626 },
627 )?;
628 trans.commit()?;
629
630 Ok(ResourceRegistryResult::Unique(rtn))
631 }
632 }
633 }
634
635 fn process_resource_row_catch(row: &Row) -> Result<ResourceRecord, Error> {
636 match Self::process_resource_row(row) {
637 Ok(ok) => Ok(ok),
638 Err(error) => {
639 eprintln!("process_resource_rows: {}", error);
640 Err(error)
641 }
642 }
643 }
644
645 fn process_resource_row(row: &Row) -> Result<ResourceRecord, Error> {
646 let key: Vec<u8> = row.get(0)?;
647 let key = ResourceKey::from_bin(key)?;
648
649 let address: String = row.get(1)?;
650 let address = ResourcePath::from_str(address.as_str())?;
651
652 let kind: String = row.get(2)?;
653 let kind = ResourceKind::from_str(kind.as_str())?;
654
655 let specific = if let ValueRef::Null = row.get_ref(3)? {
656 Option::None
657 } else {
658 let specific: String = row.get(3)?;
659 let specific = Specific::from_str(specific.as_str())?;
660 Option::Some(specific)
661 };
662
663 let owner = if let ValueRef::Null = row.get_ref(4)? {
664 Option::None
665 } else {
666 let owner: Vec<u8> = row.get(4)?;
667 let owner: UserKey = UserKey::from_bin(owner)?;
668 Option::Some(owner)
669 };
670
671 let config = if let ValueRef::Null = row.get_ref(5)? {
672 Option::None
673 } else {
674 let config: String = row.get(5)?;
675 let config = ConfigSrc::from_str(config.as_str())?;
676 Option::Some(config)
677 };
678
679 let host: Vec<u8> = row.get(6)?;
680 let host = StarKey::from_bin(host)?;
681
682 let stub = ResourceStub {
683 key: key,
684 address: address,
685 archetype: ResourceArchetype {
686 kind: kind,
687 specific: specific,
688 config: config,
689 },
690 owner: owner,
691 };
692
693 let record = ResourceRecord {
694 stub: stub,
695 location: ResourceLocation { star: host },
696 };
697
698 Ok(record)
699 }
700
701 fn process_names(
702 trans: &Transaction,
703 names: &Names,
704 params: &RegistryParams,
705 ) -> Result<(), Error> {
706 for name in names {
707 trans.execute("INSERT INTO names (key,name,resource_type,kind,specific,parent,owner,config,reservation_timestamp) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,timestamp('now','+5 minutes')", params![params.key,name,params.resource_type,params.kind,params.specific,params.parent,params.owner,params.config])?;
708 }
709 Ok(())
710 }
711
712 pub fn setup(&mut self) -> Result<(), Error> {
713 let labels = r#"
714 CREATE TABLE IF NOT EXISTS labels (
715 key INTEGER PRIMARY KEY AUTOINCREMENT,
716 resource_key BLOB,
717 name TEXT NOT NULL,
718 value TEXT NOT NULL,
719 UNIQUE(key,name),
720 FOREIGN KEY (resource_key) REFERENCES resources (key)
721 )"#;
722
723 let names = r#"
724 CREATE TABLE IF NOT EXISTS names(
725 id INTEGER PRIMARY KEY AUTOINCREMENT,
726 key BLOB,
727 name TEXT NOT NULL,
728 resource_type TEXT NOT NULL,
729 kind BLOB NOT NULL,
730 specific TEXT,
731 parent BLOB,
732 app TEXT,
733 owner BLOB,
734 reservation_timestamp TEXT,
735 UNIQUE(name,resource_type,kind,specific,parent)
736 )"#;
737
738 let resources = r#"CREATE TABLE IF NOT EXISTS resources (
739 key BLOB PRIMARY KEY,
740 address TEXT NOT NULL,
741 resource_type TEXT NOT NULL,
742 kind BLOB NOT NULL,
743 specific TEXT,
744 config TEXT,
745 parent BLOB,
746 owner BLOB,
747 host BLOB
748 )"#;
749
750 let address_index = "CREATE UNIQUE INDEX resource_address_index ON resources(address)";
751
752 let uniques = r#"CREATE TABLE IF NOT EXISTS uniques(
753 key BLOB PRIMARY KEY,
754 sequence INTEGER NOT NULL DEFAULT 0,
755 id_index INTEGER NOT NULL DEFAULT 0
756 )"#;
757
758 let transaction = self.conn.transaction()?;
759 transaction.execute(labels, [])?;
760 transaction.execute(names, [])?;
761 transaction.execute(resources, [])?;
762 transaction.execute(uniques, [])?;
763 transaction.execute(address_index, [])?;
764 transaction.commit()?;
765
766 Ok(())
767 }
768}
769
770impl LogInfo for Registry {
771 fn log_identifier(&self) -> String {
772 self.star_info.log_identifier()
773 }
774
775 fn log_kind(&self) -> String {
776 self.star_info.log_kind()
777 }
778
779 fn log_object(&self) -> String {
780 "Registry".to_string()
781 }
782}
783
784#[async_trait]
785pub trait ResourceIdSeq: Send + Sync {
786 async fn next(&self) -> ResourceId;
787}
788
789#[async_trait]
790pub trait HostedResource: Send + Sync {
791 fn key(&self) -> ResourceKey;
792}
793
794#[derive(Clone)]
795pub struct HostedResourceStore {
796 map: AsyncHashMap<ResourceKey, Arc<LocalHostedResource>>,
797}
798
799impl HostedResourceStore {
800 pub async fn new() -> Self {
801 HostedResourceStore {
802 map: AsyncHashMap::new(),
803 }
804 }
805
806 pub async fn store(&self, resource: Arc<LocalHostedResource>) -> Result<(), Error> {
807 self.map.put(resource.resource.key.clone(), resource).await
808 }
809
810 pub async fn get(&self, key: ResourceKey) -> Result<Option<Arc<LocalHostedResource>>, Error> {
811 self.map.get(key).await
812 }
813
814 pub async fn remove(
815 &self,
816 key: ResourceKey,
817 ) -> Result<Option<Arc<LocalHostedResource>>, Error> {
818 self.map.remove(key).await
819 }
820
821 pub async fn contains(&self, key: &ResourceKey) -> Result<bool, Error> {
822 self.map.contains(key.clone()).await
823 }
824}
825
826#[derive(Clone)]
827pub struct RemoteHostedResource {
828 key: ResourceKey,
829 star_host: StarKey,
830 local_skel: StarSkel,
831}
832
833pub struct LocalHostedResource {
834 pub unique_src: Box<dyn UniqueSrc>,
836 pub resource: ResourceStub,
837}
838impl HostedResource for LocalHostedResource {
839 fn key(&self) -> ResourceKey {
840 self.resource.key.clone()
841 }
842}
843
844#[async_trait]
845pub trait ResourceManager: Send + Sync {
846 async fn create(
847 &self,
848 create: ResourceCreate,
849 ) -> oneshot::Receiver<Result<ResourceRecord, Fail>>;
850}
851
852pub struct RemoteResourceManager {
853 pub key: ResourceKey,
854}
855
856impl RemoteResourceManager {
857 pub fn new(key: ResourceKey) -> Self {
858 RemoteResourceManager { key: key }
859 }
860}
861
862#[async_trait]
863impl ResourceManager for RemoteResourceManager {
864 async fn create(&self, _create: ResourceCreate) -> Receiver<Result<ResourceRecord, Fail>> {
865 unimplemented!();
866 }
867}
868
869#[derive(Clone)]
870pub struct ParentCore {
871 pub skel: StarSkel,
872 pub stub: ResourceStub,
873 pub selector: ResourceHostSelector,
874 pub child_registry: Arc<dyn ResourceRegistryBacking>,
875}
876
877impl Debug for ParentCore {
878 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
879 f.debug_tuple("ParentCore")
880 .field(&self.skel)
881 .field(&self.stub)
882 .finish()
883 }
884}
885
886pub struct Parent {
887 pub core: ParentCore,
888}
889
890impl Parent {
891 #[instrument]
892 async fn create_child(
893 core: ParentCore,
894 create: ResourceCreate,
895 tx: oneshot::Sender<Result<ResourceRecord, Fail>>,
896 ) {
897 let parent = match create
898 .parent
899 .clone()
900 .key_or("expected create.parent to already be a key")
901 {
902 Ok(key) => key,
903 Err(error) => {
904 tx.send(Err(Fail::from(error)));
905 return;
906 }
907 };
908
909 if let Ok(reservation) = core
910 .child_registry
911 .reserve(ResourceNamesReservationRequest {
912 parent: parent,
913 archetype: create.archetype.clone(),
914 info: create.registry_info.clone(),
915 })
916 .await
917 {
918 let rx =
919 ResourceCreationChamber::new(core.stub.clone(), create.clone(), core.skel.clone())
920 .await;
921
922 tokio::spawn(async move {
923
924 match Self::process_action(core.clone(), create.clone(), reservation, rx).await {
925 Ok(resource) => {
926 tx.send(Ok(resource));
927 }
928 Err(fail) => {
929 error!("Failed to create child: FAIL: {}", fail.to_string());
930 tx.send(Err(fail.into()));
931 }
932 }
933 });
934 } else {
935 error!("ERROR: reservation failed.");
936
937 tx.send(Err("RESERVATION FAILED!".into()));
938 }
939 }
940
941 async fn process_action(
942 core: ParentCore,
943 create: ResourceCreate,
944 reservation: RegistryReservation,
945 rx: oneshot::Receiver<
946 Result<ResourceAction<AssignResourceStateSrc<DataSet<BinSrc>>>, Fail>,
947 >,
948 ) -> Result<ResourceRecord, Error> {
949 let action = rx.await??;
950
951 match action {
952 ResourceAction::Create(assign) => {
953 let host = core
954 .selector
955 .select(create.archetype.kind.resource_type())
956 .await?;
957 let record = ResourceRecord::new(assign.stub.clone(), host.star_key());
958 host.assign(assign.clone().try_into()?).await?;
959 let (commit_tx, _commit_rx) = oneshot::channel();
960 reservation.commit(record.clone(), commit_tx)?;
961 host.init(assign.stub.key).await?;
962 Ok(record)
963 }
964 ResourceAction::Update(resource) => {
965 let mut proto = ProtoMessage::new();
967 proto.payload(ResourceRequestMessage::UpdateState(resource.state_src()));
968 proto.to(resource.key.clone().into());
969 proto.from(create.from.clone());
970 let reply = core.skel.messaging_api.exchange(proto.try_into()?, ReplyKind::Empty, "updating the state of a record " ).await;
971 match reply {
972 Ok(reply) => {
973 let record = core.skel.resource_locator_api.locate(resource.key.into()).await;
974 record
975 }
976 Err(err) => {
977 Err(err.into())
978 }
979 }
980
981}
984 }
985
986 }
987
988 }
1095
1096impl LogInfo for ParentCore {
1097 fn log_identifier(&self) -> String {
1098 self.skel.info.log_identifier()
1099 }
1100
1101 fn log_kind(&self) -> String {
1102 self.skel.info.log_kind()
1103 }
1104
1105 fn log_object(&self) -> String {
1106 "Parent".to_string()
1107 }
1108}
1109
1110#[async_trait]
1111impl ResourceManager for Parent {
1112 async fn create(
1113 &self,
1114 create: ResourceCreate,
1115 ) -> oneshot::Receiver<Result<ResourceRecord, Fail>> {
1116 let (tx, rx) = oneshot::channel();
1117
1118 let core = self.core.clone();
1119 tokio::spawn(async move {
1120 Parent::create_child(core, create, tx).await;
1121 });
1122 rx
1123 }
1124}
1125
1126pub struct ResourceCreationChamber {
1127 parent: ResourceStub,
1128 create: ResourceCreate,
1129 skel: StarSkel,
1130 tx: oneshot::Sender<Result<ResourceAction<AssignResourceStateSrc<DataSet<BinSrc>>>, Fail>>,
1131}
1132
1133impl ResourceCreationChamber {
1134 pub async fn new(
1135 parent: ResourceStub,
1136 create: ResourceCreate,
1137 skel: StarSkel,
1138 ) -> oneshot::Receiver<Result<ResourceAction<AssignResourceStateSrc<DataSet<BinSrc>>>, Fail>>
1139 {
1140 let (tx, rx) = oneshot::channel();
1141 let chamber = ResourceCreationChamber {
1142 parent: parent,
1143 create: create,
1144 skel: skel,
1145 tx: tx,
1146 };
1147 chamber.run().await;
1148 rx
1149 }
1150
1151 async fn run(self) {
1152 tokio::spawn(async move {
1153 if !self.create.parent.is_key() {
1154 self.tx.send( Err(Fail::Error("ResourceCreationChamber requires keyed ResourceCreate object. Call ResourceCreate::to_keyed(starlane_api) to modify".to_string())) );
1155 return;
1156 }
1157
1158 if !self
1159 .create
1160 .archetype
1161 .kind
1162 .resource_type()
1163 .parents()
1164 .contains(&self.parent.archetype.kind.resource_type())
1165 {
1166 println!("!!! -> Throwing Fail::WrongParentResourceType for kind {} & ResourceType {} <- !!!", self.create.archetype.kind.to_string(), self.create.archetype.kind.resource_type().to_string() );
1167
1168 self.tx.send(Err(Fail::WrongParentResourceType {
1169 expected: HashSet::from_iter(
1170 self.create.archetype.kind.resource_type().parents(),
1171 ),
1172 received: Option::None,
1173 }));
1174 return;
1175 };
1176
1177 match self.create.validate() {
1178 Ok(_) => {}
1179 Err(error) => {
1180 self.tx.send(Err(error));
1181 return;
1182 }
1183 }
1184
1185 fn create_address( src: &AddressCreationSrc, parent: &ResourcePath ) -> Result<ResourcePath,Error>{
1186 match src {
1187 AddressCreationSrc::Append(tail) => {
1188 Ok(parent.append(tail.as_str() )?)
1189 }
1190 AddressCreationSrc::Just(space_name) => {
1191 Ok(ResourcePath::from_str(space_name.as_str())?)
1192 }
1193 AddressCreationSrc::Exact(address) =>
1194 Ok(address.clone()),
1195 }
1196 }
1197
1198 let address = match create_address( &self.create.address, &self.parent.address ) {
1199 Ok(address) => {address}
1200 Err(err) => {
1201 self.tx.send(Err(err.into()));
1202 return;
1203 }
1204 };
1205
1206 let record = self.skel.resource_locator_api.locate(address.clone().into() ).await;
1207
1208 let key = match record{
1209 Ok(record) => {
1210 match self.create.strategy {
1211 ResourceCreateStrategy::Create => {
1212 self.tx.send(Err(format!("resource with address already exists: '{}'",address.to_string()).into()));
1213 return;
1214 }
1215 ResourceCreateStrategy::Ensure => {
1216 return;
1218 }
1219 ResourceCreateStrategy::CreateOrUpdate => {
1220 if record.stub.archetype != self.create.archetype {
1221 self.tx.send(Err("cannot update a resource with a different archetype (Type,Kind,Specific & ConfigSrc)".into()));
1222 return;
1223 }
1224 match self.create.state_src {
1225 AssignResourceStateSrc::Stateless => {
1226 self.tx.send(Err("cannot update a stateless resource".into()));
1227 return;
1228 }
1229 AssignResourceStateSrc::CreateArgs(_) => {
1230 self.tx.send(Err("cannot execute create-args on an existing resource".into()));
1231 return;
1232 }
1233 AssignResourceStateSrc::Direct(state) => {
1234 let resource = Resource::new(record.stub.key, record.stub.address, record.stub.archetype,state );
1235 self.tx.send(Ok(ResourceAction::Update(resource)) );
1236 return;
1237 }
1238 }
1239
1240 }
1241 }
1242 }
1243 Err(_) => {
1244 let _key = match &self.create.key {
1245 KeyCreationSrc::None => {
1246 let mut proto = ProtoMessage::new();
1247 proto.to(MessageTo::from(self.parent.key.clone()));
1248 proto.from(MessageFrom::Resource(self.parent.key.clone().into()));
1249 proto.payload = Option::Some(ResourceRequestMessage::Unique(
1250 self.create.archetype.kind.resource_type(),
1251 ));
1252
1253 let mut proto_star_message = match proto.try_into() {
1254 Ok(proto_star_message) => proto_star_message,
1255 Err(error) => {
1256 eprintln!(
1257 "ERROR when process proto_star_message from ProtoMessage: {}",
1258 error
1259 );
1260 return;
1261 }
1262 };
1263
1264 let skel = self.skel.clone();
1265
1266 tokio::spawn(async move {
1267 match skel.messaging_api.exchange(proto_star_message, ReplyKind::Id, "ResourceCreationChamber requesting unique id from parent to create unique ResourceKey" ).await
1268 {
1269 Ok(Reply::Id(id)) => {
1270 match ResourceKey::new(self.parent.key.clone(), id.clone()) {
1271 Ok(key) => {
1272 let final_create = self.finalize_create(key.clone(), address.clone() ).await;
1273 self.tx.send(final_create);
1274 return;
1275 }
1276 Err(error) => {
1277 self.tx.send(Err(error.into()));
1278 return;
1279 }
1280 }
1281 }
1282 Err(fail) => self.tx.send(Err(fail.into())).unwrap_or_default(),
1283 _ => {
1284 unimplemented!("ResourceCreationChamber: it should not be possible to get any other message Result other than a Result::Ok(Reply::Id(_)) or Result::Err(Fail) when expecting ReplyKind::Id" )
1285 }
1286 }
1287 });
1288 }
1289 KeyCreationSrc::Key(key) => {
1290 if key.parent() != Option::Some(self.parent.key.clone()) {
1291 let final_create = self.finalize_create(key.clone(), address.clone() ).await;
1292 self.tx.send(final_create);
1293 return;
1294 }
1295 }
1296 };
1297 }
1298 };
1299 });
1300 }
1301
1302 async fn finalize_create(
1303 &self,
1304 key: ResourceKey,
1305 address: ResourcePath,
1306 ) -> Result<ResourceAction<AssignResourceStateSrc<DataSet<BinSrc>>>, Fail> {
1307
1308
1309 let stub = ResourceStub {
1310 key: key,
1311 address: address.clone(),
1312 archetype: self.create.archetype.clone(),
1313 owner: None,
1314 };
1315
1316 let assign = ResourceAssign {
1317 kind: AssignKind::Create,
1318 stub: stub,
1319 state_src: self.create.state_src.clone(),
1320 };
1321 Ok(ResourceAction::Create(assign))
1322 }
1323}
1324
1325#[async_trait]
1326pub trait ResourceHost: Send + Sync {
1327 fn star_key(&self) -> StarKey;
1328 async fn assign(
1329 &self,
1330 assign: ResourceAssign<AssignResourceStateSrc<DataSet<BinSrc>>>,
1331 ) -> Result<(), Error>;
1332
1333 async fn init(
1334 &self,
1335 key: ResourceKey,
1336 ) -> Result<(), Error>;
1337}
1338
1339pub struct ResourceNamesReservationRequest {
1340 pub info: Option<ResourceRegistryInfo>,
1341 pub parent: ResourceKey,
1342 pub archetype: ResourceArchetype,
1343}
1344
1345pub struct RegistryReservation {
1346 tx: Option<oneshot::Sender<(ResourceRecord, oneshot::Sender<Result<(), Fail>>)>>,
1347}
1348
1349impl RegistryReservation {
1350 pub fn commit(
1351 self,
1352 record: ResourceRecord,
1353 result_tx: oneshot::Sender<Result<(), Fail>>,
1354 ) -> Result<(), Fail> {
1355 if let Option::Some(tx) = self.tx {
1356 tx.send((record, result_tx))
1357 .or(Err(Fail::Error("could not send to tx".to_string())));
1358 }
1359 Ok(())
1360 }
1361
1362 pub fn new(tx: oneshot::Sender<(ResourceRecord, oneshot::Sender<Result<(), Fail>>)>) -> Self {
1363 Self {
1364 tx: Option::Some(tx),
1365 }
1366 }
1367
1368 pub fn empty() -> Self {
1369 RegistryReservation { tx: Option::None }
1370 }
1371}
1372
1373
1374
1375pub struct FieldSelectionSql{
1376 selection: FieldSelection
1377}
1378
1379impl From<FieldSelection> for FieldSelectionSql {
1380 fn from(selection: FieldSelection) -> Self {
1381 Self{
1382 selection
1383 }
1384 }
1385}
1386
1387impl ToSql for FieldSelectionSql {
1388 fn to_sql(&self) -> Result<ToSqlOutput<'_>, rusqlite::Error> {
1389 match self.to_sql_error() {
1390 Ok(ok) => Ok(ok),
1391 Err(err) => {
1392 eprintln!("{}", err.to_string());
1393 Err(rusqlite::Error::InvalidQuery)
1394 }
1395 }
1396 }
1397}
1398
1399impl FieldSelectionSql {
1400 fn to_sql_error(&self) -> Result<ToSqlOutput<'_>, error::Error> {
1401 match &self.selection {
1402 FieldSelection::Identifier(id) => Ok(ToSqlOutput::Owned(Value::Blob(id.clone().key_or("(Identifier) selection fields must be turned into ResourceKeys before they can be used by the ResourceRegistry")?.bin()?))),
1403 FieldSelection::Type(resource_type) => {
1404 Ok(ToSqlOutput::Owned(Value::Text(resource_type.to_string())))
1405 }
1406 FieldSelection::Kind(kind) => Ok(ToSqlOutput::Owned(Value::Text(kind.to_string()))),
1407 FieldSelection::Specific(specific) => {
1408 Ok(ToSqlOutput::Owned(Value::Text(specific.to_string())))
1409 }
1410 FieldSelection::Owner(owner) => {
1411 Ok(ToSqlOutput::Owned(Value::Blob(owner.clone().bin()?)))
1412 }
1413 FieldSelection::Parent(parent_id) => Ok(ToSqlOutput::Owned(Value::Blob(parent_id.clone().key_or("(Parent) selection fields must be turned into ResourceKeys before they can be used by the ResourceRegistry")?.bin()?))),
1414 }
1415 }
1416}
1417
1418pub struct RegistryUniqueSrc {
1419 parent_resource_type: ResourceType,
1420 parent_key: ResourceIdentifier,
1421 tx: mpsc::Sender<ResourceRegistryAction>,
1422}
1423
1424impl RegistryUniqueSrc {
1425 pub fn new(parent_resource_type: ResourceType, parent_key: ResourceIdentifier, tx: mpsc::Sender<ResourceRegistryAction>) -> Self {
1426 RegistryUniqueSrc {
1427 parent_resource_type,
1428 parent_key: parent_key,
1429 tx: tx,
1430 }
1431 }
1432}
1433
1434#[async_trait]
1435impl UniqueSrc for RegistryUniqueSrc {
1436 async fn next(&self, resource_type: &ResourceType) -> Result<ResourceId, Error> {
1437 if !resource_type
1438 .parents()
1439 .contains(&self.parent_resource_type)
1440 {
1441 eprintln!("WRONG RESOURCE TYPE IN UNIQUE SRC");
1442 return Err(Fail::WrongResourceType {
1443 expected: HashSet::new(),
1445 received: resource_type.clone(),
1446 }.into());
1447 }
1448 let (tx, rx) = oneshot::channel();
1449
1450 let parent_key = match &self.parent_key {
1451 ResourceIdentifier::Key(key) => key.clone(),
1452 ResourceIdentifier::Address(address) => {
1453 let (tx, rx) = oneshot::channel();
1454 self.tx
1455 .send(ResourceRegistryAction {
1456 tx: tx,
1457 command: ResourceRegistryCommand::Get(address.clone().into()),
1458 })
1459 .await?;
1460 if let ResourceRegistryResult::Resource(Option::Some(record)) = rx.await? {
1461 record.stub.key
1462 } else {
1463 return Err(
1464 format!("could not find key for address: {}", address.to_string()).into(),
1465 );
1466 }
1467 }
1468 };
1469
1470 self.tx
1471 .send(ResourceRegistryAction {
1472 tx: tx,
1473 command: ResourceRegistryCommand::Next {
1474 key: parent_key.clone(),
1475 unique: Unique::Index,
1476 },
1477 })
1478 .await?;
1479
1480 match rx.await? {
1481 ResourceRegistryResult::Unique(index) => Ok(resource_type.to_resource_id(index as _)),
1482 what => Err(Fail::Unexpected {
1483 expected: "ResourceRegistryResult::Unique".to_string(),
1484 received: what.to_string(),
1485 }.into()),
1486 }
1487 }
1488}
1489
1490#[derive(Debug, Clone, Serialize, Deserialize)]
1491pub struct ResourceRegistration {
1492 pub resource: ResourceRecord,
1493 pub info: Option<ResourceRegistryInfo>,
1494}
1495
1496impl ResourceRegistration {
1497 pub fn new(resource: ResourceRecord, info: Option<ResourceRegistryInfo>) -> Self {
1498 ResourceRegistration {
1499 resource: resource,
1500 info: info,
1501 }
1502 }
1503}
1504
1505#[derive(Debug, Clone, Serialize, Deserialize)]
1506pub struct ResourceLocationAffinity {
1507 pub star: StarKey,
1508}
1509
1510impl From<ResourceRecord> for ResourceKey {
1511 fn from(record: ResourceRecord) -> Self {
1512 record.stub.key
1513 }
1514}
1515
1516
1517
1518pub enum ResourceManagerKey {
1519 Central,
1520 Key(ResourceKey),
1521}
1522
1523#[derive(Clone, Serialize, Deserialize)]
1949pub struct ResourceBinding {
1950 pub key: ResourceKey,
1951 pub address: ResourceAddress,
1952}
1953
1954#[derive(Debug,Clone, Serialize, Deserialize)]
1955pub struct ResourceLocation {
1956 pub star: StarKey
1957}
1958
1959impl ResourceLocation {
1960 pub fn new(star: StarKey) -> Self {
1961 Self{
1962 star
1963 }
1964 }
1965 pub fn root()-> Self {
1966 Self{
1967 star: StarKey::central()
1968 }
1969 }
1970}
1971
1972#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
1973pub struct DisplayValue {
1974 string: String,
1975}
1976
1977impl DisplayValue {
1978 pub fn new(string: &str) -> Result<Self, Error> {
1979 if string.is_empty() {
1980 return Err("cannot be empty".into());
1981 }
1982
1983 Ok(DisplayValue {
1984 string: string.to_string(),
1985 })
1986 }
1987}
1988
1989impl ToString for DisplayValue {
1990 fn to_string(&self) -> String {
1991 self.string.clone()
1992 }
1993}
1994
1995impl FromStr for DisplayValue {
1996 type Err = Error;
1997
1998 fn from_str(s: &str) -> Result<Self, Self::Err> {
1999 Ok(DisplayValue::new(s)?)
2000 }
2001}
2002
2003#[derive(Clone, Serialize, Deserialize, Eq, PartialEq)]
2004pub enum ResourceSliceStatus {
2005 Unknown,
2006 Preparing,
2007 Waiting,
2008 Ready,
2009}
2010
2011impl ToString for ResourceSliceStatus {
2012 fn to_string(&self) -> String {
2013 match self {
2014 ResourceSliceStatus::Unknown => "Unknown".to_string(),
2015 ResourceSliceStatus::Preparing => "Preparing".to_string(),
2016 ResourceSliceStatus::Waiting => "Waiting".to_string(),
2017 ResourceSliceStatus::Ready => "Ready".to_string(),
2018 }
2019 }
2020}
2021
2022impl FromStr for ResourceSliceStatus {
2023 type Err = Error;
2024
2025 fn from_str(s: &str) -> Result<Self, Self::Err> {
2026 match s {
2027 "Unknown" => Ok(Self::Unknown),
2028 "Preparing" => Ok(Self::Preparing),
2029 "Waiting" => Ok(Self::Waiting),
2030 "Ready" => Ok(Self::Ready),
2031 what => Err(format!("not recognized: {}", what).into()),
2032 }
2033 }
2034}
2035
2036#[derive(Clone, Serialize, Deserialize)]
2037pub struct ResourceSliceAssign {
2038 key: ResourceKey,
2039 archetype: ResourceArchetype,
2040}
2041
2042pub struct RemoteResourceHost {
2058 pub skel: StarSkel,
2059 pub handle: StarConscript,
2060}
2061
2062#[async_trait]
2063impl ResourceHost for RemoteResourceHost {
2064 fn star_key(&self) -> StarKey {
2065 self.handle.key.clone()
2066 }
2067
2068 async fn assign(
2069 &self,
2070 assign: ResourceAssign<AssignResourceStateSrc<DataSet<BinSrc>>>,
2071 ) -> Result<(), Error> {
2072 if !self
2073 .handle
2074 .kind
2075 .hosted()
2076 .contains(&assign.stub.key.resource_type())
2077 {
2078 return Err(Fail::WrongResourceType {
2079 expected: self.handle.kind.hosted().clone(),
2080 received: assign.stub.key.resource_type().clone(),
2081 }.into());
2082 }
2083
2084 let mut proto = ProtoStarMessage::new();
2085 proto.to = self.handle.key.clone().into();
2086 proto.payload =
2087 StarMessagePayload::ResourceHost(ResourceHostAction::Assign(assign.try_into()?));
2088
2089 self.skel
2090 .messaging_api
2091 .exchange(
2092 proto,
2093 ReplyKind::Empty,
2094 "RemoteResourceHost: assign resource to host",
2095 )
2096 .await?;
2097
2098 Ok(())
2099 }
2100
2101 async fn init(&self, key: ResourceKey) -> Result<(), Error> {
2102 let mut proto = ProtoStarMessage::new();
2103 proto.to = self.handle.key.clone().into();
2104 proto.payload =
2105 StarMessagePayload::ResourceHost(ResourceHostAction::Init(key));
2106
2107 self.skel
2108 .messaging_api
2109 .exchange(
2110 proto,
2111 ReplyKind::Empty,
2112 "RemoteResourceHost: create resource on host",
2113 )
2114 .await?;
2115
2116 Ok(())
2117 }
2118}
2119
2120pub trait ResourceSelectorId:
2121 Debug
2122 + Clone
2123 + Serialize
2124 + for<'de> Deserialize<'de>
2125 + Eq
2126 + PartialEq
2127 + Hash
2128 + Into<ResourceIdentifier>
2129 + Sized
2130{
2131}
2132
2133#[async_trait]
2134pub trait UniqueSrc: Send + Sync {
2135 async fn next(&self, resource_type: &ResourceType) -> Result<ResourceId, Error>;
2136}
2137
2138#[derive(Debug, Clone, Serialize, Deserialize)]
2139pub struct ResourceRecord {
2140 pub stub: ResourceStub,
2141 pub location: ResourceLocation,
2142}
2143
2144impl ResourceRecord {
2145 pub fn new(stub: ResourceStub, host: StarKey) -> Self {
2146 ResourceRecord {
2147 stub: stub,
2148 location: ResourceLocation::new(host),
2149 }
2150 }
2151
2152 pub fn root() -> Self {
2153 Self {
2154 stub: ResourceStub::root(),
2155 location: ResourceLocation::root(),
2156 }
2157 }
2158}
2159
2160impl Into<ResourceStub> for ResourceRecord {
2161 fn into(self) -> ResourceStub {
2162 self.stub
2163 }
2164}
2165
2166 pub async fn to_keyed_for_field_selection( selection: FieldSelection, starlane_api: &StarlaneApi) -> Result<FieldSelection, Error> {
2167 match selection{
2168 FieldSelection::Identifier(id) => Ok(FieldSelection::Identifier(
2169 starlane_api.to_key(id).await?.into(),
2170 )),
2171 FieldSelection::Type(resource_type) => Ok(FieldSelection::Type(resource_type)),
2172 FieldSelection::Kind(kind) => Ok(FieldSelection::Kind(kind)),
2173 FieldSelection::Specific(specific) => Ok(FieldSelection::Specific(specific)),
2174 FieldSelection::Owner(owner) => Ok(FieldSelection::Owner(owner)),
2175 FieldSelection::Parent(id) => Ok(FieldSelection::Parent(
2176 starlane_api.to_key(id).await?.into(),
2177 )),
2178 }
2179 }
2180
2181
2182pub async fn to_keyed_for_reasource_create(create: ResourceCreate, starlane_api: StarlaneApi) -> Result<ResourceCreate, Error> {
2183 Ok(ResourceCreate{
2184 parent: starlane_api.to_key(create.parent).await?.into(),
2185 key: create.key,
2186 address: create.address,
2187 archetype: create.archetype,
2188 state_src: create.state_src,
2189 registry_info: create.registry_info,
2190 owner: create.owner,
2191 strategy: create.strategy,
2192 from: create.from
2193 })
2194}
2195
2196pub async fn to_keyed_for_resource_selector(selector: ResourceSelector, starlane_api: StarlaneApi) -> Result<ResourceSelector, Error> {
2197 let mut fields: HashSet<FieldSelection> = HashSet::new();
2198
2199 for field in selector.fields {
2200 fields.insert(to_keyed_for_field_selection(field,&starlane_api).await?.into());
2201 }
2202
2203 Ok(ResourceSelector {
2204 meta: selector.meta,
2205 fields: fields,
2206 })
2207}