autoschematic_core/connector.rs
1use std::{
2 collections::HashMap,
3 ffi::OsString,
4 fs::File,
5 io::BufReader,
6 path::{Component, Path, PathBuf},
7 sync::Arc,
8};
9
10use anyhow::bail;
11use documented::{Documented, DocumentedFields};
12use serde::{Deserialize, Serialize};
13
14use async_trait::async_trait;
15
16use crate::{bundle::UnbundleResponseElement, macros::FieldTypes, template::ReadOutput};
17
18pub use crate::diag::DiagnosticResponse;
19
20/// ConnectorOps output by Connector::plan() may declare a set of output values
21/// that they will set or delete on execution.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub enum OutputValuePlan {
24 Set,
25 Delete,
26}
27
28pub type OutputValueExec = Option<String>;
29
30pub type OutputMapPlan = HashMap<String, OutputValuePlan>;
31pub type OutputMapExec = HashMap<String, OutputValueExec>;
32
33pub type OutputMap = HashMap<String, String>;
34
35#[derive(Serialize, Deserialize)]
36pub enum OutputMapFile {
37 PointerToVirtual(PathBuf),
38 OutputMap(OutputMap),
39}
40
41impl OutputMapFile {
42 pub fn path(prefix: &Path, addr: &Path) -> PathBuf {
43 let mut output = PathBuf::from(".autoschematic");
44 // let mut output = prefix.to_path_buf();
45
46 output.push(prefix);
47
48 // Join the parent portion of `addr`, if it exists
49 if let Some(parent) = addr.parent() {
50 // Guard against pathological cases like ".." or "." parents
51 // by only pushing normal components
52 for comp in parent.components() {
53 if let Component::Normal(_) = comp {
54 output.push(comp)
55 }
56 }
57 }
58
59 let mut new_filename = OsString::new();
60 if let Some(fname) = addr.file_name() {
61 new_filename.push(fname);
62 } else {
63 // If there's no file name at all, we'll just use ".out.json"
64 // so `new_filename` right now is just "." - that's fine.
65 // We'll end up producing something like "./office/east/ec2/us-east-1/.out.json"
66 }
67 new_filename.push(".out.json");
68
69 output.push(new_filename);
70
71 output
72 }
73
74 pub fn read(prefix: &Path, addr: &Path) -> anyhow::Result<Option<Self>> {
75 let output_path = Self::path(prefix, addr);
76
77 if output_path.is_file() {
78 let file = File::open(&output_path)?;
79 let reader = BufReader::new(file);
80
81 let output: Self = serde_json::from_reader(reader)?;
82
83 return Ok(Some(output));
84 }
85
86 Ok(None)
87 }
88
89 pub fn read_recurse(prefix: &Path, addr: &Path) -> anyhow::Result<Option<Self>> {
90 let output_path = Self::path(prefix, addr);
91
92 if output_path.is_file() {
93 let file = File::open(&output_path)?;
94 let reader = BufReader::new(file);
95
96 let output: Self = serde_json::from_reader(reader)?;
97
98 match &output {
99 OutputMapFile::PointerToVirtual(virt_addr) => {
100 return Self::read_recurse(prefix, virt_addr);
101 }
102 OutputMapFile::OutputMap(_) => return Ok(Some(output)),
103 }
104 }
105
106 Ok(None)
107 }
108
109 pub fn write(&self, prefix: &Path, addr: &Path) -> anyhow::Result<PathBuf> {
110 let output_path = Self::path(prefix, addr);
111
112 let contents = serde_json::to_string_pretty(self)?;
113
114 if let Some(parent) = output_path.parent() {
115 std::fs::create_dir_all(parent)?;
116 }
117 if output_path.exists() {
118 std::fs::remove_file(&output_path)?;
119 }
120
121 std::fs::write(&output_path, contents)?;
122
123 Ok(output_path)
124 }
125
126 pub fn write_recurse(&self, prefix: &Path, addr: &Path) -> anyhow::Result<()> {
127 let output_path = Self::path(prefix, addr);
128
129 if output_path.is_file() {
130 let contents = std::fs::read_to_string(&output_path)?;
131
132 let output: Self = serde_json::from_str(&contents)?;
133
134 match &output {
135 OutputMapFile::PointerToVirtual(virtual_address) => {
136 return self.write_recurse(prefix, virtual_address);
137 }
138 OutputMapFile::OutputMap(_) => {
139 if let Some(parent) = output_path.parent() {
140 std::fs::create_dir_all(parent)?;
141 }
142 std::fs::write(&output_path, serde_json::to_string_pretty(self)?)?;
143 }
144 }
145 } else {
146 if let Some(parent) = output_path.parent() {
147 std::fs::create_dir_all(parent)?;
148 }
149 std::fs::write(&output_path, serde_json::to_string_pretty(self)?)?;
150 }
151
152 Ok(())
153 }
154
155 // TODO we should disallow infinite recursive links somehow
156 pub fn resolve(prefix: &Path, addr: &Path) -> anyhow::Result<Option<VirtualAddress>> {
157 let Some(output) = Self::read(prefix, addr)? else {
158 return Ok(None);
159 };
160
161 match output {
162 OutputMapFile::PointerToVirtual(virtual_address) => Self::resolve(prefix, &virtual_address),
163 OutputMapFile::OutputMap(_) => Ok(Some(VirtualAddress(addr.to_path_buf()))),
164 }
165 }
166
167 pub fn get(prefix: &Path, addr: &Path, key: &str) -> anyhow::Result<Option<String>> {
168 let Some(output) = Self::read(prefix, addr)? else {
169 return Ok(None);
170 };
171
172 match output {
173 OutputMapFile::PointerToVirtual(virtual_address) => Self::get(prefix, &virtual_address, key),
174 OutputMapFile::OutputMap(map) => Ok(map.get(key).cloned()),
175 }
176 }
177
178 pub fn apply_output_map(prefix: &Path, addr: &Path, output_map_exec: &OutputMapExec) -> anyhow::Result<Option<PathBuf>> {
179 let original = Self::read_recurse(prefix, addr)?.unwrap_or(OutputMapFile::OutputMap(HashMap::new()));
180
181 let OutputMapFile::OutputMap(mut original_map) = original else {
182 bail!(
183 "apply_output_map({}, {}): resolved to a link file!",
184 prefix.display(),
185 addr.display()
186 );
187 };
188
189 for (key, value) in output_map_exec {
190 match value {
191 Some(value) => {
192 original_map.insert(key.clone(), value.clone());
193 }
194 None => {
195 original_map.remove(key);
196 }
197 }
198 }
199
200 if original_map.is_empty() {
201 Ok(None)
202 } else {
203 OutputMapFile::OutputMap(original_map).write_recurse(prefix, addr)?;
204 Ok(Some(Self::path(prefix, addr)))
205 }
206 }
207
208 pub fn write_link(prefix: &Path, phy_addr: &Path, virt_addr: &Path) -> anyhow::Result<PathBuf> {
209 let output_map = Self::PointerToVirtual(virt_addr.to_path_buf());
210 output_map.write(prefix, phy_addr)?;
211 Ok(Self::path(prefix, phy_addr))
212 }
213
214 pub fn delete(prefix: &Path, addr: &Path) -> anyhow::Result<Option<PathBuf>> {
215 let path = Self::path(prefix, addr);
216 if path.is_file() {
217 std::fs::remove_file(&path)?;
218 Ok(Some(path))
219 } else {
220 Ok(None)
221 }
222 }
223}
224
225pub mod handle;
226pub mod shutdown;
227pub mod spawn;
228pub mod task_registry;
229
230#[bitmask_enum::bitmask(u32)]
231#[derive(Serialize, Deserialize)]
232pub enum FilterResponse {
233 Config,
234 Resource,
235 Bundle,
236 Task,
237 Metric,
238 None = 0b0,
239}
240
241#[derive(Clone, Debug, Serialize, Deserialize)]
242pub struct VirtualAddress(pub PathBuf);
243
244#[derive(Clone, Debug, Serialize, Deserialize)]
245pub struct PhysicalAddress(pub PathBuf);
246
247#[derive(Debug, Serialize, Deserialize)]
248/// GetResourceResponse represents the successful result of Connector.get(addr).
249/// Where a resource exists at `addr` that is fetched by the connector,
250/// `resource_definition`` will contain the connector's string representation of that
251/// resource, and `outputs` will contain the
252pub struct GetResourceResponse {
253 pub resource_definition: Vec<u8>,
254 pub virt_addr: Option<PathBuf>,
255 pub outputs: Option<OutputMap>,
256}
257
258impl GetResourceResponse {
259 /// Write the contents of this GetResourceResponse to disk. Assumes that the caller has the current
260 /// directory set to the repo root.
261 /// Returns a Vec of the file paths that were actually written.
262 pub async fn write(self, prefix: &Path, phy_addr: &Path, virt_addr: &Path) -> anyhow::Result<Vec<PathBuf>> {
263 let virt_addr = if phy_addr == virt_addr
264 && let Some(ref self_virt_addr) = self.virt_addr
265 {
266 self_virt_addr
267 } else {
268 virt_addr
269 };
270
271 let mut res = Vec::new();
272
273 let body = self.resource_definition;
274 let res_path = prefix.join(virt_addr);
275
276 if let Some(parent) = res_path.parent() {
277 tokio::fs::create_dir_all(parent).await?;
278 }
279
280 tokio::fs::write(&res_path, body).await?;
281
282 res.push(res_path);
283
284 if let Some(outputs) = self.outputs
285 && !outputs.is_empty()
286 {
287 let output_map_file = OutputMapFile::OutputMap(outputs);
288 res.push(output_map_file.write(prefix, virt_addr)?);
289
290 if virt_addr != phy_addr {
291 res.push(OutputMapFile::write_link(prefix, phy_addr, virt_addr)?);
292 }
293 }
294
295 Ok(res)
296 }
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
300/// DocIdent represents the target of Connector::GetDocstring().
301/// It is used by connector implementations to determine which specific item to return documentation for,
302/// whether it be a struct or a field of a struct.
303pub enum DocIdent {
304 Struct { name: String },
305 // Enum { name: String },
306 EnumVariant { parent: String, name: String },
307 Field { parent: String, name: String },
308}
309
310#[derive(Debug, Serialize, Deserialize)]
311/// GetDocResponse represents the successful result of Connector.get_docstring(ident).
312/// This represents the Docstring or other documentation corresponding to
313/// structs or enums used in resource bodies.
314/// Just like Connector::diag(), it is intended for use with autoschematic-lsp
315/// to help users write resource bodies manually.
316pub struct GetDocResponse {
317 pub r#type: String,
318 pub markdown: String,
319 pub fields: Vec<String>,
320}
321
322impl GetDocResponse {
323 pub fn from_documented<T: Documented + DocumentedFields>() -> Self {
324 Self {
325 r#type: std::any::type_name::<T>().into(),
326 markdown: T::DOCS.to_string(),
327 fields: T::FIELD_NAMES.iter().map(|s| String::from(*s)).collect(),
328 }
329 }
330
331 pub fn from_documented_field<T: FieldTypes + DocumentedFields>(field: &str) -> Result<Self, documented::Error> {
332 Ok(Self {
333 r#type: T::field_type(field).unwrap_or_default().to_string(),
334 markdown: T::get_field_docs(field)?.to_string(),
335 fields: Vec::new(),
336 })
337 }
338}
339
340impl From<&'static str> for GetDocResponse {
341 fn from(value: &'static str) -> Self {
342 Self {
343 r#type: String::new(),
344 markdown: value.to_string(),
345 fields: Vec::new(),
346 }
347 }
348}
349
350#[derive(Debug, Serialize, Deserialize, Clone)]
351/// PlanResponseElement represents the successful result of `Connector.plan(addr, current, desired)``.
352/// Specifically, Connector.plan(...) will return a list of one or more PlanResponseElements representing
353/// a sequence of steps to take such that ideally, Connector.get(addr) == desired after executing
354/// each of those steps.
355pub struct PlanResponseElement {
356 pub op_definition: String,
357 pub writes_outputs: Vec<String>,
358 pub friendly_message: Option<String>,
359}
360
361#[derive(Debug, Clone, Serialize, Deserialize)]
362/// OpExecResponse represents the result of a Connector successfully executing a ConnectorOp.
363/// Where a ConnectorOp may, for example, return the ID of a created resource,
364/// OpExecResponse may be used to store that ID in `outputs`, where it will be
365/// saved and committed as {addr}.output.json file adjacent to the addr at which the.
366pub struct OpExecResponse {
367 pub outputs: Option<OutputMapExec>,
368 pub friendly_message: Option<String>,
369}
370
371#[derive(Debug, Default, Clone, Serialize, Deserialize)]
372/// TaskExecResponse represents the result of a Connector successfully executing a Task.
373pub struct TaskExecResponse {
374 /// The next value of `state` with which to call task_exec(...) next time. If None, the task is not executed again.
375 pub next_state: Option<Vec<u8>>,
376 /// If a task modifies a file on-disk within its repository, it can let the client know
377 /// by including it here.
378 /// Paths should be relative to the root of the repository.
379 pub modified_files: Option<Vec<PathBuf>>,
380 /// Task files, like Resource files, can have associated outputs. Outputs returned here are merged into the task's
381 /// output file.
382 pub outputs: Option<HashMap<String, Option<String>>>,
383 /// Tasks may also return secret values for the runtime to optionally seal and write to disk if desired
384 pub secrets: Option<HashMap<PathBuf, Option<String>>>,
385 /// Each task_exec phase can return a friendly human-readable message detailing its state.
386 pub friendly_message: Option<String>,
387 /// Delay the next task_exec phase until at least `delay_until` seconds after the UNIX epoch
388 pub delay_until: Option<u64>,
389}
390
391#[derive(Debug, Serialize, Deserialize)]
392/// SkeletonResponse represents a template of a resource managed by a Connector.
393/// A connector can return multiple skeletons through get_skeletons() in order to
394/// provide the user with a set of templates to quickly scaffold new resources, or as
395/// examples of the kinds of resources that the user can instantiate and manage through the connector.
396pub struct SkeletonResponse {
397 pub addr: PathBuf,
398 pub body: Vec<u8>,
399}
400
401/// ConnectorOutbox is primarily used to transmit logs for tracestores across
402/// the remote bridge, i.e. tarpc. The usefulness of this may be reexamined later.
403pub type ConnectorOutbox = tokio::sync::broadcast::Sender<Option<OsString>>;
404pub type ConnectorInbox = tokio::sync::broadcast::Receiver<Option<OsString>>;
405
406#[derive(Debug, Serialize, Deserialize)]
407/// VirtToPhyResponse represents the result of Connector::addr_virt_to_phy(addr).
408/// Where a connector implementation may map a "virtual" name, to, for instance, route table ID within
409/// a VPC, or an EC2 instance ID. This allows resources to be created within a repo and named or laid out
410/// ahead of their actual creation, even though their "canonical" address, their instance ID etc,
411/// is not known until after creation.
412pub enum VirtToPhyResponse {
413 /// The resource is defined at a "virtual" address, but its physical address is not populated
414 /// because it does not exist yet. For example, an EC2 instance may have been
415 /// drafted in the repository, but because it doesn't exist yet, its physical address
416 /// (in essence its EC2 instance ID) is undefined.
417 NotPresent,
418 /// The resource is defined at a "virtual" address, but its physical address is not populated
419 /// because it does not exist yet, and in addition, its physical address relies on a
420 /// parent resource that also does not exist yet.
421 /// For example, a new subnet within a new VPC may have been
422 /// drafted in the repository, but because the VPC does not exist yet,
423 /// the subnet is "deferred". It cannot even be planned until the parent VPC exists.
424 Deferred(Vec<ReadOutput>),
425 /// The virtual address resolved successfully to a physical address.
426 /// For example, an EC2 instance within a repository exists and resolved to its canonical instance-id-derived address.
427 /// E.G: aws/ec2/instances/main_supercomputer_1.ron
428 /// -> aws/ec2/instances/i-398180832.ron
429 Present(PathBuf),
430 /// For virtual addresses that have no need to map to physical addresses, this represents that trivial mapping.
431 Null(PathBuf),
432}
433
434#[async_trait]
435#[allow(clippy::new_ret_no_self)]
436pub trait Connector: Send + Sync {
437 /// Attempt to create a new, uninitialized Connector mounted at `prefix`.
438 /// Returns `dyn Connector` to allow implementations to dynamically select Connectors by name.
439 /// Should not fail due to invalid config - Connector::init() should handle that.
440 async fn new(name: &str, prefix: &Path, outbox: ConnectorOutbox) -> Result<Arc<dyn Connector>, anyhow::Error>
441 where
442 Self: Sized;
443
444 /// Attempt to initialize, or reinitialize, a Connector.
445 /// This will read from environment variables, config files, etc and
446 /// may fail on invalid configuration.
447 /// Methods like Connector::eq() and Connector::diag() may
448 /// still be possible even when uninitialized or when
449 /// `Connector::init()` has failed.
450 async fn init(&self) -> Result<(), anyhow::Error>;
451
452 /// For a given file within a prefix, this function determines if that file
453 /// corresponds to a resource managed by this connector, a configuration file
454 /// controlling this connector, or neither.
455 /// In essence, this decides on the subset of the address space that the connector
456 /// will manage, where "address space" is the nested hierarchy of files.
457 /// If `addr` falls within the resource address space of this connector, return `FilterResponse::Resource`.
458 /// If `addr` is a configuration file for this connector, return `FilterResponse::Config`.
459 /// Otherwise, return `FilterResponse::None`.
460 /// filter() is cached by the upstream client, but that cache is reset if
461 /// a file is modified for which connector.filter() returns `FilterResponse::Config`.
462 /// So, filter() can read config files to, say, have multiple k8s clusters under its management,
463 /// or multiple SSH hosts to manage files over sshfs, and thus dynamically determine
464 /// whether, say, sshfs/hosts/example.com/var/www/some_file.txt is `Resource` or `None`
465 /// (depending on whether example.com is listed in its config file or not),
466 /// but if it implements this behaviour, it must declare that config file by returning 'CONFIG' for the config file path.
467 async fn filter(&self, addr: &Path) -> Result<FilterResponse, anyhow::Error>;
468
469 /// List all "extant" (E.G., currently existing in AWS, k8s, etc...) object paths, whether they exist in local config or not.
470 /// subpath is used to constrain the space of queried results.
471 /// The subpath "./" applies no constraint. Connectors may choose interpret subpath in order to
472 /// E.G. avoid redundant network queries, but there is no requirement that they do so.
473 /// The upstream client will automatically handle more fine-grained filtering.
474 /// See [addr_matches_filter] for the implementation of that filtering.
475 /// For example, if the subpath is "aws/vpc/us-east-2/vpcs/", the AWS VPC connector might choose to parse
476 /// that subpath up to "aws/vpc/us-east-2/" and only run the list queries for resources in the us-east-2 region.
477 /// It will still return a superset of the resources specified by the full subpath, but a subset of those
478 /// specified by "./" - again, clients will do the more fine-grained filtering themselves after the fact.
479 /// Connectors inform clients about how deeply they'll parse `subpath` through the `Connector::subpaths()` function.
480 async fn list(&self, subpath: &Path) -> anyhow::Result<Vec<PathBuf>>;
481
482 /// Describes how the connector's list() implementation orthogonally subdivides the address space in order to
483 /// more efficiently parallelize imports spanning large address spaces.
484 /// For example, the AWS VPC connector's list() implementation might
485 /// return ["aws/vpc/us-east-1", "aws/vpc/us-east-2", ...]
486 /// in order to represent to the client that it can efficiently run parallel list() operations
487 /// under those subpaths. Then, the list() implementation must guarantee that it can
488 /// correctly parse and limit its querying to each subset of the address space.
489 /// The implementation could even be more fine-grained and return, for instance,
490 /// ["aws/vpc/us-east-1/vpcs", "aws/vpc/us-east-1/internet_gateways", "aws/vpc/us-east-2/vpcs", "aws/vpc/us-east-2/internet_gateways"]
491 /// if it can do even deeper parsing, but this example would likely see diminishing returns.
492 async fn subpaths(&self) -> anyhow::Result<Vec<PathBuf>> {
493 Ok(vec![PathBuf::from("./")])
494 }
495
496 /// Get the current "real" state of the object at `addr`.
497 /// For instance, get("aws/vpc/us-east-1/vpcs/vpc-0348895.ron") would query the AWS API for the
498 /// current state of the vpc with that ID in the us-east-1 region in the account configured,
499 /// and form the human-readable (code) representation as contained in the .ron file.
500 /// Note that get() only takes physical addresses - the addr_virt_to_phy is always carried out by the
501 /// client ahead of time where needed.
502 async fn get(&self, addr: &Path) -> Result<Option<GetResourceResponse>, anyhow::Error>;
503
504 /// Determine how to set current -> desired.
505 /// Returns a sequence of Ops that can be executed by op_exec.
506 /// This function essentially takes,for a given resource address
507 /// the current state as returned by `Connector::get(addr)`, (or none if that resource does not exist),
508 /// as well as the desired state (as defined by the state of the file at `./${prefix}/${addr}` on disk).
509 /// It will return a set of ConnectorOps which, when passed to op_exec and executed successfully in sequence, should
510 /// result in Connector::get(addr)? == desired .
511 /// Note that if desired is None, and current is Some(...), this indicates deleting the remote resource at addr.
512 async fn plan(
513 &self,
514 addr: &Path,
515 current: Option<Vec<u8>>,
516 desired: Option<Vec<u8>>,
517 ) -> Result<Vec<PlanResponseElement>, anyhow::Error>;
518
519 /// Execute a ConnectorOp.
520 /// OpExecResponse may include output files, containing, for example,
521 /// the resultant IDs of created resources such as EC2 instances or VPCs.
522 /// This will be stored at ./{prefix}/{addr}.out.json,
523 /// or merged if already present.
524 async fn op_exec(&self, addr: &Path, op: &str) -> Result<OpExecResponse, anyhow::Error>;
525
526 /// For resources like VPCs whose ID cannot be known until after creation,
527 /// we allow returning the resultant vpc_id in outputs after get() or op_exec().
528 /// This allows connectors to translate this mapping and resolve a "virtual" path, with a
529 /// user-provided "fake" ID (like a human-readable name), into a physical path, with the actual canonical resource ID.
530 /// For example, if we created a VPC with virtual addr "aws/vpc/eu-west-2/vpcs/main.ron",
531 /// then after creation, we'd have two output files:
532 /// `.outputs/aws/vpc/eu-west-2/vpcs/vpc-038598204.ron` -> PointerToVirtual("aws/vpc/eu-west-2/vpcs/main.ron")
533 /// `.outputs/aws/vpc/eu-west-2/vpcs/main.ron` -> OutputMap({vpc_id: "vpc-038598204", ...})
534 /// addr_virt_to_phy is where connectors define this mapping: that the physical address can be formed from the
535 /// virtual address by reading the output map and substituting certain path components.
536 /// Most connectors shouldn't need to override this.
537 async fn addr_virt_to_phy(&self, addr: &Path) -> Result<VirtToPhyResponse, anyhow::Error> {
538 Ok(VirtToPhyResponse::Null(addr.into()))
539 }
540
541 /// Reverses the mapping from addr_virt_to_phy. Usually this just means traversing the resource
542 /// hierarchy for each parent resource and resolving it with ResourceAddress::phy_to_virt.
543 /// See the AWS VpcConnector for an example implementation.
544 /// Most connectors shouldn't need to implement this and can just return `addr`.
545 /// `None` indicates a failure to resolve the address, such as a dangling output file at a physical address
546 /// with a PointerToVirtual entry that no longer exists.
547 async fn addr_phy_to_virt(&self, addr: &Path) -> anyhow::Result<Option<PathBuf>> {
548 Ok(Some(addr.into()))
549 }
550
551 /// To aid development, connectors can provide the user with a set of
552 /// "skeleton" resources outlining each type of resource managed by the connector.
553 /// Each skeleton resource has an address with `[square_brackets]` for the variable portions,
554 /// and the body of the resource should serve as a valid example instance of the resource.
555 async fn get_skeletons(&self) -> anyhow::Result<Vec<SkeletonResponse>> {
556 Ok(Vec::new())
557 }
558
559 /// Connectors may additionally serve docstrings. This is intended to aid development
560 /// from an IDE or similar, with a language server hooking into connectors on hover.
561 async fn get_docstring(&self, _addr: &Path, _ident: DocIdent) -> anyhow::Result<Option<GetDocResponse>> {
562 Ok(None)
563 }
564
565 /// Corresponds to an implementation of PartialEq for the underlying resource types
566 /// as parsed by the connector.
567 /// This is used in, for example, pull-state, in order to determine if local state needs to be updated
568 /// to match remote state.
569 /// The default implementation simply compares strings, without serializing or parsing in any way.
570 /// addr is ignored in this default case.
571 async fn eq(&self, _addr: &Path, a: &[u8], b: &[u8]) -> anyhow::Result<bool> {
572 Ok(a == b)
573 }
574
575 /// If a resource at `addr` with body `a` fails to parse, connectors may return diagnostics
576 /// that outline where the parsing failed with error information.
577 /// This is intended to aid development from an IDE or similar, with a language server hooking into connectors.
578 async fn diag(&self, _addr: &Path, _a: &[u8]) -> anyhow::Result<Option<DiagnosticResponse>> {
579 Ok(None)
580 }
581
582 /// Where a Connector or Bundle implementation may define a bundle, with its associated ResourceAddress and Resource formats,
583 /// This is where that bundle will be unpacked into one or more resources.
584 async fn unbundle(&self, _addr: &Path, _bundle: &[u8]) -> anyhow::Result<Vec<UnbundleResponseElement>> {
585 Ok(Vec::new())
586 }
587
588 /// task_exec represents the entrypoint, or continuation point, of a Connector's
589 /// custom stateful task implementation. Connectors can implement imperative workflows associated with resources,
590 /// or as standalone task bodies, by returning `FilterResponse::Task | ... ` in their filter() implementation.
591 /// `arg` sets the initial argument for the task. `arg` is set to None after the first execution.
592 /// state always starts as None when a task is first executed.
593 /// Once invoked by a user, a connector client will repeatedly call task_exec until it returns next_state = None within `TaskExecResponse`,
594 /// or throws an error. In this way, connectors store no state - only the clients.
595 async fn task_exec(
596 &self,
597 _addr: &Path,
598 _body: Vec<u8>,
599
600 _arg: Option<Vec<u8>>,
601 _state: Option<Vec<u8>>,
602 ) -> anyhow::Result<TaskExecResponse> {
603 Ok(TaskExecResponse::default())
604 }
605
606 /// Design: TODO: How shall we define the GetMetricResponse enum?
607 /// Again, how will metrics be stored by the server and queried?
608 async fn list_metrics(&self, _addr: &Path) -> anyhow::Result<Vec<String>> {
609 Ok(Vec::new())
610 }
611
612 async fn read_metric(&self, _addr: &Path, _name: &str) -> anyhow::Result<()> {
613 Ok(())
614 }
615
616 async fn version(&self) -> anyhow::Result<String> {
617 Ok(env!("CARGO_PKG_VERSION").to_string())
618 }
619}
620
621// Helper traits for defining custom internal types in Connector implementations.
622// Note that such types are erased by definition at the Connector interface boundary.
623
624/// Resource represents a resource body, either the contents of a file on disk, or
625/// a virtual, remote resource as returned by `Connector::get(addr)`.
626/// Connectors implement implement and consume their own Resource types. The actual
627/// types themselves are erased at the interface between Autoschematic and the Connector implementations
628/// it instantiates.
629/// For example, Connector::plan(addr, current, desired) takes a &Path, Option<&\[u8\]>, Option<&\[u8\]>,
630/// and the connector implementation will parse that as an internal implementation of ResourceAddress, Option<Resource>, Option<Resource>,
631/// in order to produce a Vec of structs that implement ConnectorOp, which it will then pass back in serialized form as Vec<String>.
632/// Then, Connector::op_exec(addr, connector_op) will similarly parse the raw addr and connector_op into its internal implementations of ResourceAddress and ConnectorOp
633/// in order to interpret them and execute an operation.
634pub trait Resource: Send + Sync {
635 fn to_bytes(&self) -> anyhow::Result<Vec<u8>>;
636
637 fn from_bytes(addr: &impl ResourceAddress, s: &[u8]) -> anyhow::Result<Self>
638 where
639 Self: Sized;
640}
641
642/// A ResourceAddress represents a unique identifier addressing a single resource
643/// unambiguously within a prefix. A given ResourceAddress should have a static,
644/// bidirectional mapping to a relative file path.
645/// `{prefix}/{addr.to_path_buf()}` with a given connector configuration should therefore
646/// serve to globally and uniquely identify a particular resource of a particular type.
647pub trait ResourceAddress: Send + Sync + Clone + std::fmt::Debug {
648 /// Produce the path in the repository corresponding to this resource address
649 fn to_path_buf(&self) -> PathBuf;
650
651 /// Produce the resource address corresponding to a path
652 fn from_path(path: &Path) -> anyhow::Result<Self>
653 where
654 Self: Sized;
655
656 fn get_output(&self, prefix: &Path, key: &str) -> anyhow::Result<Option<String>> {
657 let addr = self.to_path_buf();
658 OutputMapFile::get(prefix, &addr, key)
659 }
660
661 fn phy_to_virt(&self, prefix: &Path) -> anyhow::Result<Option<Self>> {
662 let Some(virt_addr) = OutputMapFile::resolve(prefix, &self.to_path_buf())? else {
663 return Ok(None);
664 };
665
666 Ok(Some(Self::from_path(&virt_addr.0)?))
667 }
668}
669
670/// A ConnectorOp represents a single discrete operation that a Connector can execute.
671/// Not all ConnectorOps can be truly idempotent, but they should make efforts to be so.
672/// ConnectorOps should be as granular as the Connector requires.
673/// ConnectorOps are always executed with an associated ResourceAddress
674/// through Connector::op_exec(ResourceAddress, ConnectorOp).
675pub trait ConnectorOp: Send + Sync + std::fmt::Debug {
676 fn to_string(&self) -> anyhow::Result<String>;
677 fn from_str(s: &str) -> anyhow::Result<Self>
678 where
679 Self: Sized;
680 /// A human-readable message in plain english explaining what this connector op will do.
681 /// You should use the imperative mood, for example:
682 /// "Modify tags for IAM role Steve", or
683 /// "Delete VPC vpc-923898 in region us-south-5"
684 fn friendly_plan_message(&self) -> Option<String> {
685 None
686 }
687 /// A human-readable message in plain english explaining what this connector op has just done.
688 /// You should use the past-tense indicative mood, for example:
689 /// "Modified tags for IAM role Steve", or
690 /// "Deleted VPC vpc-923898 in region us-south-5"
691 fn friendly_exec_message(&self) -> Option<String> {
692 None
693 }
694}
695
696#[async_trait]
697impl Connector for Arc<dyn Connector> {
698 async fn new(name: &str, prefix: &Path, outbox: ConnectorOutbox) -> anyhow::Result<Arc<dyn Connector>> {
699 return Self::new(name, prefix, outbox).await;
700 }
701
702 async fn init(&self) -> anyhow::Result<()> {
703 Connector::init(self.as_ref()).await
704 }
705
706 async fn version(&self) -> anyhow::Result<String> {
707 Connector::version(self.as_ref()).await
708 }
709
710 async fn filter(&self, addr: &Path) -> anyhow::Result<FilterResponse> {
711 Connector::filter(self.as_ref(), addr).await
712 }
713
714 async fn list(&self, subpath: &Path) -> anyhow::Result<Vec<PathBuf>> {
715 Connector::list(self.as_ref(), subpath).await
716 }
717
718 async fn subpaths(&self) -> anyhow::Result<Vec<PathBuf>> {
719 Connector::subpaths(self.as_ref()).await
720 }
721
722 async fn get(&self, addr: &Path) -> anyhow::Result<Option<GetResourceResponse>> {
723 Connector::get(self.as_ref(), addr).await
724 }
725
726 async fn plan(
727 &self,
728 addr: &Path,
729 current: Option<Vec<u8>>,
730 desired: Option<Vec<u8>>,
731 ) -> anyhow::Result<Vec<PlanResponseElement>> {
732 Connector::plan(self.as_ref(), addr, current, desired).await
733 }
734
735 async fn op_exec(&self, addr: &Path, op: &str) -> anyhow::Result<OpExecResponse> {
736 Connector::op_exec(self.as_ref(), addr, op).await
737 }
738
739 async fn addr_virt_to_phy(&self, addr: &Path) -> anyhow::Result<VirtToPhyResponse> {
740 Connector::addr_virt_to_phy(self.as_ref(), addr).await
741 }
742
743 async fn addr_phy_to_virt(&self, addr: &Path) -> anyhow::Result<Option<PathBuf>> {
744 Connector::addr_phy_to_virt(self.as_ref(), addr).await
745 }
746
747 async fn get_docstring(&self, addr: &Path, ident: DocIdent) -> anyhow::Result<Option<GetDocResponse>> {
748 Connector::get_docstring(self.as_ref(), addr, ident).await
749 }
750
751 async fn get_skeletons(&self) -> anyhow::Result<Vec<SkeletonResponse>> {
752 Connector::get_skeletons(self.as_ref()).await
753 }
754
755 async fn eq(&self, addr: &Path, a: &[u8], b: &[u8]) -> anyhow::Result<bool> {
756 Connector::eq(self.as_ref(), addr, a, b).await
757 }
758
759 async fn diag(&self, addr: &Path, a: &[u8]) -> anyhow::Result<Option<DiagnosticResponse>> {
760 Connector::diag(self.as_ref(), addr, a).await
761 }
762
763 async fn task_exec(
764 &self,
765 addr: &Path,
766 body: Vec<u8>,
767 arg: Option<Vec<u8>>,
768 state: Option<Vec<u8>>,
769 ) -> anyhow::Result<TaskExecResponse> {
770 Connector::task_exec(self.as_ref(), addr, body, arg, state).await
771 }
772
773 async fn unbundle(&self, addr: &Path, bundle: &[u8]) -> anyhow::Result<Vec<UnbundleResponseElement>> {
774 Connector::unbundle(self.as_ref(), addr, bundle).await
775 }
776}