Skip to main content

telltale_runtime/topology/
parser.rs

1//! Topology DSL parser.
2//!
3//! Parses topology definitions from DSL source code.
4
5use super::{Location, RoleFamilyConstraint, Topology, TopologyConstraint, TopologyMode};
6use crate::identifiers::{Endpoint as TopologyEndpoint, IdentifierError, Region, RoleName};
7use crate::ChannelCapacity;
8use pest::Parser;
9use pest_derive::Parser;
10use thiserror::Error;
11
12#[derive(Parser)]
13#[grammar = "compiler/topology.pest"]
14struct TopologyParser;
15
16/// Errors that can occur during topology parsing.
17#[derive(Debug, Clone, Error)]
18pub enum TopologyParseError {
19    #[error("Parse error: {0}")]
20    ParseError(String),
21
22    #[error("Unknown mode: {0}")]
23    UnknownMode(String),
24
25    #[error("Invalid location: {0}")]
26    InvalidLocation(String),
27
28    #[error("Invalid constraint: {0}")]
29    InvalidConstraint(String),
30
31    #[error("Invalid capacity: {0}")]
32    InvalidCapacity(String),
33
34    #[error("Invalid identifier: {0}")]
35    InvalidIdentifier(IdentifierError),
36}
37
38impl From<pest::error::Error<Rule>> for TopologyParseError {
39    fn from(e: pest::error::Error<Rule>) -> Self {
40        TopologyParseError::ParseError(e.to_string())
41    }
42}
43
44impl From<IdentifierError> for TopologyParseError {
45    fn from(err: IdentifierError) -> Self {
46        TopologyParseError::InvalidIdentifier(err)
47    }
48}
49
50/// Parsed topology with metadata.
51#[derive(Debug, Clone)]
52pub struct ParsedTopology {
53    /// Name of this topology configuration.
54    pub name: String,
55    /// Name of the choreography this topology is for.
56    pub for_choreography: String,
57    /// The topology configuration.
58    pub topology: Topology,
59}
60
61/// Parse a topology definition from DSL source.
62pub fn parse_topology(input: &str) -> Result<ParsedTopology, TopologyParseError> {
63    let pairs = TopologyParser::parse(Rule::topology, input)?;
64    let mut name = String::new();
65    let mut for_choreography = String::new();
66    let mut topology = Topology::new();
67
68    for pair in pairs {
69        if pair.as_rule() == Rule::topology {
70            let mut inner = pair.into_inner();
71
72            // Get topology name
73            if let Some(name_pair) = inner.next() {
74                name = name_pair.as_str().to_string();
75            }
76
77            // Get choreography name (after "for")
78            if let Some(for_pair) = inner.next() {
79                for_choreography = for_pair.as_str().to_string();
80            }
81
82            // Parse topology body
83            if let Some(body_pair) = inner.next() {
84                topology = parse_topology_body(body_pair)?;
85            }
86        }
87    }
88
89    Ok(ParsedTopology {
90        name,
91        for_choreography,
92        topology,
93    })
94}
95
96fn parse_topology_body(pair: pest::iterators::Pair<Rule>) -> Result<Topology, TopologyParseError> {
97    let mut topology = Topology::new();
98
99    for inner in pair.into_inner() {
100        match inner.as_rule() {
101            Rule::topology_mode => {
102                topology.mode = Some(parse_topology_mode(inner)?);
103            }
104            Rule::topology_mappings => {
105                for mapping in inner.into_inner() {
106                    let (role, location) = parse_topology_mapping(mapping)?;
107                    topology.locations.insert(role, location);
108                }
109            }
110            Rule::topology_constraints => {
111                for constraint in inner.into_inner() {
112                    if constraint.as_rule() == Rule::constraint_decl {
113                        topology.constraints.push(parse_constraint(constraint)?);
114                    }
115                }
116            }
117            Rule::channel_capacities_block => {
118                for decl in inner.into_inner() {
119                    if decl.as_rule() == Rule::channel_capacity_decl {
120                        let (sender, receiver, capacity) = parse_channel_capacity_decl(decl)?;
121                        topology
122                            .channel_capacities
123                            .insert((sender, receiver), capacity);
124                    }
125                }
126            }
127            Rule::role_constraints_block => {
128                for decl in inner.into_inner() {
129                    if decl.as_rule() == Rule::role_constraint_decl {
130                        let (family, constraint) = parse_role_constraint_decl(decl)?;
131                        topology.role_constraints.insert(family, constraint);
132                    }
133                }
134            }
135            _ => {}
136        }
137    }
138
139    Ok(topology)
140}
141
142fn parse_topology_mode(
143    pair: pest::iterators::Pair<Rule>,
144) -> Result<TopologyMode, TopologyParseError> {
145    for inner in pair.into_inner() {
146        if inner.as_rule() == Rule::topology_mode_value {
147            return parse_mode_value(inner);
148        }
149    }
150    Err(TopologyParseError::UnknownMode("empty mode".to_string()))
151}
152
153fn parse_mode_value(pair: pest::iterators::Pair<Rule>) -> Result<TopologyMode, TopologyParseError> {
154    let mut inner = pair.into_inner();
155    let mode_name = inner
156        .next()
157        .map(|p| p.as_str().to_string())
158        .ok_or_else(|| TopologyParseError::UnknownMode("empty mode".to_string()))?;
159    let mode_arg = inner.next().map(|p| p.as_str().to_string());
160    match (mode_name.as_str(), mode_arg.as_deref()) {
161        ("local", None) => Ok(TopologyMode::Local),
162        ("local", Some(arg)) => Err(TopologyParseError::UnknownMode(format!("local({arg})"))),
163        ("per_role", arg) => Err(TopologyParseError::UnknownMode(match arg {
164            Some(arg) => format!("per_role({arg})"),
165            None => "per_role".to_string(),
166        })),
167        ("kubernetes", arg) => Err(TopologyParseError::UnknownMode(match arg {
168            Some(arg) => format!("kubernetes({arg})"),
169            None => "kubernetes".to_string(),
170        })),
171        ("consul", arg) => Err(TopologyParseError::UnknownMode(match arg {
172            Some(arg) => format!("consul({arg})"),
173            None => "consul".to_string(),
174        })),
175        (other, Some(arg)) => Err(TopologyParseError::UnknownMode(format!("{other}({arg})"))),
176        (other, None) => Err(TopologyParseError::UnknownMode(other.to_string())),
177    }
178}
179
180fn parse_topology_mapping(
181    pair: pest::iterators::Pair<Rule>,
182) -> Result<(RoleName, Location), TopologyParseError> {
183    let mut inner = pair.into_inner();
184    let role = inner
185        .next()
186        .map(|p| RoleName::new(p.as_str()))
187        .transpose()?
188        .ok_or_else(|| TopologyParseError::InvalidConstraint("missing role".to_string()))?;
189    let location = inner
190        .next()
191        .map(|p| parse_location(p))
192        .transpose()?
193        .unwrap_or(Location::Local);
194
195    Ok((role, location))
196}
197
198fn parse_location(pair: pest::iterators::Pair<Rule>) -> Result<Location, TopologyParseError> {
199    let inner = pair.into_inner().next();
200    match inner {
201        Some(p) => match p.as_rule() {
202            Rule::local_location => Ok(Location::Local),
203            Rule::colocated_location => {
204                let peer = p
205                    .into_inner()
206                    .next()
207                    .map(|i| RoleName::new(i.as_str()))
208                    .transpose()?
209                    .ok_or_else(|| {
210                        TopologyParseError::InvalidLocation("colocated requires a role".to_string())
211                    })?;
212                Ok(Location::Colocated(peer))
213            }
214            Rule::endpoint => Ok(Location::Remote(TopologyEndpoint::new(p.as_str())?)),
215            _ => {
216                let s = p.as_str();
217                if s == "local" {
218                    Ok(Location::Local)
219                } else {
220                    Ok(Location::Remote(TopologyEndpoint::new(s)?))
221                }
222            }
223        },
224        None => Ok(Location::Local),
225    }
226}
227
228fn parse_constraint(
229    pair: pest::iterators::Pair<Rule>,
230) -> Result<TopologyConstraint, TopologyParseError> {
231    let inner = pair
232        .into_inner()
233        .next()
234        .ok_or_else(|| TopologyParseError::InvalidConstraint("empty constraint".to_string()))?;
235
236    match inner.as_rule() {
237        Rule::colocated_constraint => {
238            let mut idents = inner.into_inner();
239            let r1 = idents
240                .next()
241                .map(|p| RoleName::new(p.as_str()))
242                .transpose()?
243                .ok_or_else(|| {
244                    TopologyParseError::InvalidConstraint(
245                        "colocated requires two roles".to_string(),
246                    )
247                })?;
248            let r2 = idents
249                .next()
250                .map(|p| RoleName::new(p.as_str()))
251                .transpose()?
252                .ok_or_else(|| {
253                    TopologyParseError::InvalidConstraint(
254                        "colocated requires two roles".to_string(),
255                    )
256                })?;
257            Ok(TopologyConstraint::Colocated(r1, r2))
258        }
259        Rule::separated_constraint => {
260            let roles: Vec<RoleName> = inner
261                .into_inner()
262                .flat_map(|p| p.into_inner())
263                .map(|p| RoleName::new(p.as_str()))
264                .collect::<Result<Vec<_>, _>>()?;
265            // Separated constraints are binary; use first two roles
266            if roles.len() >= 2 {
267                Ok(TopologyConstraint::Separated(
268                    roles[0].clone(),
269                    roles[1].clone(),
270                ))
271            } else {
272                Err(TopologyParseError::InvalidConstraint(
273                    "separated requires at least 2 roles".to_string(),
274                ))
275            }
276        }
277        Rule::pinned_constraint => {
278            let mut inner_iter = inner.into_inner();
279            let role = inner_iter
280                .next()
281                .map(|p| RoleName::new(p.as_str()))
282                .transpose()?
283                .ok_or_else(|| {
284                    TopologyParseError::InvalidConstraint("pinned requires a role".to_string())
285                })?;
286            let location = inner_iter
287                .next()
288                .map(|p| parse_location(p))
289                .transpose()?
290                .unwrap_or(Location::Local);
291            Ok(TopologyConstraint::Pinned(role, location))
292        }
293        Rule::region_constraint => {
294            let mut idents = inner.into_inner();
295            let role = idents
296                .next()
297                .map(|p| RoleName::new(p.as_str()))
298                .transpose()?
299                .ok_or_else(|| {
300                    TopologyParseError::InvalidConstraint("region requires a role".to_string())
301                })?;
302            let region = idents
303                .next()
304                .map(|p| Region::new(p.as_str()))
305                .transpose()?
306                .ok_or_else(|| {
307                    TopologyParseError::InvalidConstraint("region requires a value".to_string())
308                })?;
309            Ok(TopologyConstraint::Region(role, region))
310        }
311        _ => Err(TopologyParseError::InvalidConstraint(format!(
312            "unknown constraint type: {:?}",
313            inner.as_rule()
314        ))),
315    }
316}
317
318fn parse_channel_capacity_decl(
319    pair: pest::iterators::Pair<Rule>,
320) -> Result<(RoleName, RoleName, ChannelCapacity), TopologyParseError> {
321    let mut inner = pair.into_inner();
322    let sender = inner
323        .next()
324        .map(|p| RoleName::new(p.as_str()))
325        .transpose()?
326        .ok_or_else(|| TopologyParseError::InvalidConstraint("missing sender".to_string()))?;
327    let receiver = inner
328        .next()
329        .map(|p| RoleName::new(p.as_str()))
330        .transpose()?
331        .ok_or_else(|| TopologyParseError::InvalidConstraint("missing receiver".to_string()))?;
332    let capacity = inner
333        .next()
334        .ok_or_else(|| TopologyParseError::InvalidConstraint("missing capacity".to_string()))?
335        .as_str()
336        .parse::<u32>()
337        .map_err(|e| TopologyParseError::InvalidCapacity(e.to_string()))?;
338    let capacity = ChannelCapacity::try_new(capacity)
339        .map_err(|e| TopologyParseError::InvalidCapacity(e.to_string()))?;
340
341    Ok((sender, receiver, capacity))
342}
343
344fn parse_role_constraint_decl(
345    pair: pest::iterators::Pair<Rule>,
346) -> Result<(String, RoleFamilyConstraint), TopologyParseError> {
347    let mut inner = pair.into_inner();
348
349    // Get family name
350    let family = inner
351        .next()
352        .map(|p| p.as_str().to_string())
353        .ok_or_else(|| {
354            TopologyParseError::InvalidConstraint("role constraint missing family name".to_string())
355        })?;
356
357    // Get constraint spec
358    let spec = inner.next().ok_or_else(|| {
359        TopologyParseError::InvalidConstraint("role constraint missing specification".to_string())
360    })?;
361
362    let constraint = parse_role_constraint_spec(spec)?;
363    Ok((family, constraint))
364}
365
366fn parse_role_constraint_spec(
367    pair: pest::iterators::Pair<Rule>,
368) -> Result<RoleFamilyConstraint, TopologyParseError> {
369    let mut min: Option<u32> = None;
370    let mut max: Option<u32> = None;
371
372    for inner in pair.into_inner() {
373        match inner.as_rule() {
374            Rule::min_constraint => {
375                let value = inner
376                    .into_inner()
377                    .next()
378                    .and_then(|p| p.as_str().parse::<u32>().ok())
379                    .ok_or_else(|| {
380                        TopologyParseError::InvalidConstraint(
381                            "min constraint requires integer value".to_string(),
382                        )
383                    })?;
384                min = Some(value);
385            }
386            Rule::max_constraint => {
387                let value = inner
388                    .into_inner()
389                    .next()
390                    .and_then(|p| p.as_str().parse::<u32>().ok())
391                    .ok_or_else(|| {
392                        TopologyParseError::InvalidConstraint(
393                            "max constraint requires integer value".to_string(),
394                        )
395                    })?;
396                max = Some(value);
397            }
398            _ => {}
399        }
400    }
401
402    Ok(RoleFamilyConstraint {
403        min: min.unwrap_or(0),
404        max,
405    })
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411
412    #[test]
413    fn test_parse_local_mode_topology() {
414        let input = r#"
415            topology TestLocal for PingPong {
416                mode: local
417            }
418        "#;
419
420        let result = parse_topology(input).unwrap();
421        assert_eq!(result.name, "TestLocal");
422        assert_eq!(result.for_choreography, "PingPong");
423        assert_eq!(result.topology.mode, Some(TopologyMode::Local));
424    }
425
426    #[test]
427    fn test_parse_topology_with_mappings() {
428        let input = r#"
429            topology Dev for PingPong {
430                Alice: localhost:8080
431                Bob: localhost:8081
432            }
433        "#;
434
435        let result = parse_topology(input).unwrap();
436        assert_eq!(result.name, "Dev");
437        assert_eq!(
438            result
439                .topology
440                .get_location(&RoleName::from_static("Alice"))
441                .unwrap(),
442            Location::Remote(TopologyEndpoint::new("localhost:8080").unwrap())
443        );
444        assert_eq!(
445            result
446                .topology
447                .get_location(&RoleName::from_static("Bob"))
448                .unwrap(),
449            Location::Remote(TopologyEndpoint::new("localhost:8081").unwrap())
450        );
451    }
452
453    #[test]
454    fn test_parse_topology_with_constraints() {
455        let input = r#"
456            topology Prod for TwoPhaseCommit {
457                Coordinator: coordinator.internal:9000
458                ParticipantA: participant-a.internal:9000
459                ParticipantB: participant-b.internal:9000
460
461                constraints {
462                    separated: Coordinator, ParticipantA
463                    region: Coordinator -> us_east_1
464                }
465            }
466        "#;
467
468        let result = parse_topology(input).unwrap();
469        assert_eq!(result.name, "Prod");
470        assert_eq!(result.topology.constraints.len(), 2);
471    }
472
473    #[test]
474    fn test_parse_channel_capacities() {
475        let input = r#"
476            topology Capacity for Protocol {
477                Alice: local
478                Bob: local
479
480                channel_capacities {
481                    Alice -> Bob: 4
482                }
483            }
484        "#;
485
486        let result = parse_topology(input).unwrap();
487        let key = (RoleName::from_static("Alice"), RoleName::from_static("Bob"));
488        let capacity = result.topology.channel_capacities.get(&key).copied();
489        assert_eq!(
490            capacity,
491            Some(ChannelCapacity::try_new(4).expect("test capacity in range"))
492        );
493    }
494
495    #[test]
496    fn test_parse_removed_deployment_modes_fail_closed() {
497        for input in [
498            r#"
499                topology PerRole for MyProtocol {
500                    mode: per_role
501                }
502            "#,
503            r#"
504                topology K8s for MyProtocol {
505                    mode: kubernetes(myapp)
506                }
507            "#,
508            r#"
509                topology Consul for MyProtocol {
510                    mode: consul(eucentral)
511                }
512            "#,
513        ] {
514            let err = parse_topology(input).expect_err("removed mode must reject");
515            assert!(matches!(err, TopologyParseError::UnknownMode(_)));
516        }
517    }
518
519    #[test]
520    fn test_parse_unknown_mode() {
521        let input = r#"
522            topology Unknown for MyProtocol {
523                mode: edge_router(prod)
524            }
525        "#;
526
527        let err = parse_topology(input).expect_err("unknown mode must reject");
528        assert!(
529            matches!(err, TopologyParseError::UnknownMode(mode) if mode == "edge_router(prod)")
530        );
531    }
532
533    #[test]
534    fn test_parse_colocated_location() {
535        let input = r#"
536            topology Mixed for Protocol {
537                Alice: local
538                Bob: colocated(Alice)
539                Carol: remote.host:8080
540            }
541        "#;
542
543        let result = parse_topology(input).unwrap();
544        assert_eq!(
545            result
546                .topology
547                .get_location(&RoleName::from_static("Alice"))
548                .unwrap(),
549            Location::Local
550        );
551        assert_eq!(
552            result
553                .topology
554                .get_location(&RoleName::from_static("Bob"))
555                .unwrap(),
556            Location::Colocated(RoleName::from_static("Alice"))
557        );
558    }
559
560    #[test]
561    fn test_parse_role_constraints_min_only() {
562        let input = r#"
563            topology ThresholdSig for Protocol {
564                Coordinator: localhost:8000
565
566                role_constraints {
567                    Witness: min = 3
568                }
569            }
570        "#;
571
572        let result = parse_topology(input).unwrap();
573        let constraint = result.topology.role_constraints.get("Witness").unwrap();
574        assert_eq!(constraint.min, 3);
575        assert_eq!(constraint.max, None);
576    }
577
578    #[test]
579    fn test_parse_role_constraints_min_and_max() {
580        let input = r#"
581            topology ThresholdSig for Protocol {
582                role_constraints {
583                    Witness: min = 3, max = 10
584                }
585            }
586        "#;
587
588        let result = parse_topology(input).unwrap();
589        let constraint = result.topology.role_constraints.get("Witness").unwrap();
590        assert_eq!(constraint.min, 3);
591        assert_eq!(constraint.max, Some(10));
592    }
593
594    #[test]
595    fn test_parse_role_constraints_max_first() {
596        let input = r#"
597            topology ThresholdSig for Protocol {
598                role_constraints {
599                    Worker: max = 5, min = 1
600                }
601            }
602        "#;
603
604        let result = parse_topology(input).unwrap();
605        let constraint = result.topology.role_constraints.get("Worker").unwrap();
606        assert_eq!(constraint.min, 1);
607        assert_eq!(constraint.max, Some(5));
608    }
609
610    #[test]
611    fn test_parse_role_constraints_multiple_families() {
612        let input = r#"
613            topology ThresholdSig for Protocol {
614                role_constraints {
615                    Witness: min = 3
616                    Worker: min = 1, max = 10
617                    Validator: max = 5
618                }
619            }
620        "#;
621
622        let result = parse_topology(input).unwrap();
623        assert_eq!(result.topology.role_constraints.len(), 3);
624
625        let witness = result.topology.role_constraints.get("Witness").unwrap();
626        assert_eq!(witness.min, 3);
627        assert_eq!(witness.max, None);
628
629        let worker = result.topology.role_constraints.get("Worker").unwrap();
630        assert_eq!(worker.min, 1);
631        assert_eq!(worker.max, Some(10));
632
633        let validator = result.topology.role_constraints.get("Validator").unwrap();
634        assert_eq!(validator.min, 0); // default min
635        assert_eq!(validator.max, Some(5));
636    }
637
638    #[test]
639    fn test_parse_role_constraints_with_mappings_and_constraints() {
640        let input = r#"
641            topology Prod for TwoPhaseCommit {
642                Coordinator: coordinator.internal:9000
643
644                role_constraints {
645                    Participant: min = 2, max = 100
646                }
647
648                constraints {
649                    region: Coordinator -> us_east_1
650                }
651            }
652        "#;
653
654        let result = parse_topology(input).unwrap();
655        assert_eq!(result.topology.role_constraints.len(), 1);
656        assert_eq!(result.topology.constraints.len(), 1);
657
658        let participant = result.topology.role_constraints.get("Participant").unwrap();
659        assert_eq!(participant.min, 2);
660        assert_eq!(participant.max, Some(100));
661    }
662}