import pytest
from pattern_core import Pattern, Subject
class TestCompleteWorkflow:
def test_build_query_transform_workflow(self):
subjects = [
Subject(f"user-{i}")
for i in range(5)
]
for i, s in enumerate(subjects):
s.add_label("Person")
s.set_property("name", f"User {i}")
s.set_property("age", 20 + i)
patterns = [Pattern.point(s) for s in subjects]
graph = Pattern.pattern("root", Pattern.from_values(patterns))
assert graph.length() == 5
assert not graph.is_atomic()
adults = graph.filter(lambda p: True) assert adults is not None
transformed = graph.map(lambda x: x) assert transformed.length() == 5
values = list(graph.values())
assert len(values) == 6
def test_graph_analysis_workflow(self):
alice = Subject("alice")
alice.add_label("Person")
alice.set_property("name", "Alice")
bob = Subject("bob")
bob.add_label("Person")
bob.set_property("name", "Bob")
charlie = Subject("charlie")
charlie.add_label("Person")
charlie.set_property("name", "Charlie")
p_alice = Pattern.point(alice)
p_bob = Pattern.point(bob)
p_charlie = Pattern.point(charlie)
network = Pattern.pattern("root", Pattern.from_values([p_alice, p_bob, p_charlie]))
assert network.length() == 3
assert network.size() >= 3
depth = network.depth()
assert depth >= 0
has_people = network.any_value(lambda x: True)
assert has_people is not None
count = 0
for _ in network.values():
count += 1
assert count == 4
def test_data_transformation_pipeline(self):
raw_data = [
{"id": "1", "value": 10, "type": "A"},
{"id": "2", "value": 20, "type": "B"},
{"id": "3", "value": 30, "type": "A"},
]
subjects = []
for item in raw_data:
s = Subject(item["id"])
s.add_label(item["type"])
s.set_property("value", item["value"])
subjects.append(s)
patterns = [Pattern.point(s) for s in subjects]
dataset = Pattern.pattern("root", Pattern.from_values(patterns))
transformed = dataset.map(lambda x: x)
total = 0
for value in dataset.values():
total += 1 assert total == 4
assert dataset.length() == 3
assert transformed.length() == 3
class TestRealWorldScenarios:
def test_configuration_tree(self):
root = Subject("config-root")
root.add_label("Config")
root.set_property("version", "1.0")
db = Subject("config-db")
db.add_label("DatabaseConfig")
db.set_property("host", "localhost")
db.set_property("port", 5432)
api = Subject("config-api")
api.add_label("APIConfig")
api.set_property("host", "0.0.0.0")
api.set_property("port", 8080)
db_pattern = Pattern.point(db)
api_pattern = Pattern.point(api)
children = Pattern.pattern("root", Pattern.from_values([db_pattern, api_pattern]))
root_pattern = Pattern.point(root)
config_tree = Pattern.pattern("root", Pattern.from_values([root_pattern, children]))
assert config_tree.length() == 2
assert not config_tree.is_atomic()
depth = config_tree.depth()
assert depth >= 1
def test_event_log_processing(self):
events = []
for i in range(10):
event = Subject(f"event-{i}")
event.add_label("Event")
event.set_property("timestamp", i * 1000)
event.set_property("type", "UserAction" if i % 2 == 0 else "SystemEvent")
events.append(event)
event_patterns = [Pattern.point(e) for e in events]
event_log = Pattern.pattern("root", Pattern.from_values(event_patterns))
assert event_log.length() == 10
filtered = event_log.filter(lambda x: True)
assert filtered is not None
count = event_log.fold(0, lambda acc, x: acc + 1)
assert count == 11
def test_knowledge_graph_fragment(self):
python = Subject("python")
python.add_label("Language")
python.set_property("name", "Python")
python.set_property("paradigm", "multi-paradigm")
rust = Subject("rust")
rust.add_label("Language")
rust.set_property("name", "Rust")
rust.set_property("paradigm", "systems")
web = Subject("web-dev")
web.add_label("Domain")
web.set_property("name", "Web Development")
entities = [
Pattern.point(python),
Pattern.point(rust),
Pattern.point(web),
]
knowledge_graph = Pattern.pattern("root", Pattern.from_values(entities))
assert knowledge_graph.length() == 3
entity_count = 0
for _ in knowledge_graph.values():
entity_count += 1
assert entity_count == 4
class TestComonadWorkflow:
def test_context_aware_transformation(self):
values = [1, 2, 3, 4, 5]
patterns = [Pattern.point(v) for v in values]
sequence = Pattern.pattern("root", Pattern.from_values(patterns))
focus = sequence.extract()
assert focus is not None
extended = sequence.extend(lambda p: p.extract())
assert extended is not None
assert extended.length() == sequence.length()
def test_sliding_window_analysis(self):
time_series = Pattern.pattern("root", Pattern.from_values([10, 20, 15, 25, 30, 35, 40]))
current = time_series.extract()
assert current is not None
result = time_series.extend(lambda p: p)
assert result.length() == time_series.length()
class TestCombinationOperations:
def test_merge_patterns(self):
p1 = Pattern.pattern("root", Pattern.from_values([1, 2, 3]))
p2 = Pattern.pattern("root", Pattern.from_values([4, 5, 6]))
combined = p1.combine(p2)
assert combined is not None
assert combined.size() > 0
assert combined.length() >= max(p1.length(), p2.length())
def test_combine_with_subjects(self):
s1 = Subject("subj-1")
s1.add_label("Type1")
s2 = Subject("subj-2")
s2.add_label("Type2")
p1 = Pattern.point(s1)
p2 = Pattern.point(s2)
combined = p1.combine(p2)
assert combined is not None
class TestErrorRecovery:
def test_partial_failure_recovery(self):
subjects = []
for i in range(5):
s = Subject(f"subj-{i}")
s.add_label("Item")
subjects.append(s)
patterns = [Pattern.point(s) for s in subjects]
dataset = Pattern.pattern("root", Pattern.from_values(patterns))
try:
result = dataset.map(lambda x: x)
assert result is not None
except Exception as e:
error_msg = str(e)
assert len(error_msg) > 0
def test_validation_workflow(self):
p = Pattern.pattern("root", Pattern.from_values([1, 2, 3, 4, 5]))
assert p.length() == 5
assert not p.is_atomic()
values = list(p.values())
assert len(values) == 6
result = p.map(lambda x: x * 2)
assert result.length() == 5
class TestPerformanceScenarios:
def test_large_dataset_processing(self):
size = 1000
data = list(range(size))
pattern = Pattern.pattern("root", Pattern.from_values(data))
assert pattern.length() == size
doubled = pattern.map(lambda x: x * 2)
assert doubled.length() == size
evens = pattern.filter(lambda x: x % 2 == 0)
assert len(evens) <= size
total = pattern.fold(0, lambda acc, x: acc + 1)
assert total == size + 1
def test_deep_structure_traversal(self):
depth = 50
p = Pattern.point(42)
for i in range(depth):
p = Pattern.pattern(f"level-{i}", [p])
assert p.depth() >= depth
value = p.extract()
assert value is not None
if __name__ == "__main__":
pytest.main([__file__, "-v"])